基于Google最新Agents论文的学习文档


导语

原链接:Google AI Agents Technical Guide

这份白皮书是 Google 对AI Agent 从概念到生产部署的完整技术路线图,涵盖了:

  1. ✅ AI Agent 的定义和分类
  2. ✅ Agent 开发框架(ADK)
  3. ✅ 企业级智能体的架构设计
  4. ✅ 从实验室到生产的技术转化
  5. ✅ 最佳实践和案例研究

Google选择用ADK 四层架构(LLM + Tool + Memory + Orchestration)来系统地组织和管理 Agent 的各个关键组件,使其能够协调工作,对当前相对还算混乱的Agent工程有一定的参考意义。


第一部分:基础概念

什么是 AI Agent

定义

AI Agent 是一个自主的软件系统,能够:

  • 📍 感知环境状态
  • 🧠 理解用户意图
  • 🤔 制定行动计划
  • 执行具体操作
  • 📊 评估执行结果
  • 🔄 反馈调整策略

基本循环

1
感知 (Perceive) → 思考 (Think) → 行动 (Act) → 反思 (Reflect) → 循环

与传统程序的对比:

1
2
3
传统程序:输入 → 处理 → 输出(一次性执行)

AI Agent:用户意图 → 理解 → 规划 → 执行 → 监控 → 优化(持续循环)

Agent 的核心特性

1. 自主性(Autonomy)

Agent 能够在没有外部干预的情况下独立完成任务。

示例:订单处理 Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 非自主系统
def process_order(order_id):
order = get_order(order_id)
if order.status == "unpaid":
print("订单未支付,请人工处理") # 停滞
return order

# 自主 Agent
async def autonomous_order_agent(order_id):
while True:
order = get_order(order_id)
if order.status == "unpaid":
# 自主检查用户余额
balance = check_user_balance(order.user_id)
if balance >= order.amount:
# 自主发起支付
payment_result = process_payment(order)
if payment_result.success:
update_order_status(order_id, "paid")
else:
# 自主发送提醒
send_reminder(order.user_id, "余额不足")

await asyncio.sleep(3600) # 每小时检查一次

2. 反应性(Responsiveness)

Agent 能够及时对环境变化做出响应。

特点:

  • 实时感知事件
  • 快速决策
  • 立即执行响应

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ResponsiveAgent:
def __init__(self):
self.event_queue = asyncio.Queue()
self.handlers = {}

async def on_user_message(self, message: str):
"""实时处理用户消息"""
await self.event_queue.put({
'type': 'user_message',
'data': message,
'timestamp': time.time()
})

async def process_events(self):
"""持续处理事件"""
while True:
event = await self.event_queue.get()

# 立即响应
handler = self.handlers.get(event['type'])
if handler:
response = await handler(event['data'])
await self.send_response(response)

self.event_queue.task_done()

async def send_response(self, response: str):
"""实时发送响应"""
print(f"Agent 回复: {response}")

3. 主动性(Proactiveness)

Agent 不仅被动响应,还能主动采取行动。

vs 被动式系统:

1
2
3
4
5
6
7
被动式(Reactive):
用户输入 → 系统处理 → 返回结果
(只做被要求的事)

主动式(Proactive):
系统监控 → 发现问题 → 主动解决
(主动发现和处理问题)

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ProactiveAgent:
"""主动型 Agent 示例"""

async def monitor_system_health(self):
"""主动监控系统健康度"""
while True:
health = self.check_system_status()

# 主动发现问题
if health['cpu_usage'] > 80:
# 主动采取行动,而不是等待告警
await self.trigger_auto_scaling()
await self.notify_admin("CPU 使用率过高,已自动扩容")

if health['memory_usage'] > 85:
# 主动清理缓存
await self.cleanup_cache()
await self.log_alert("内存使用率过高,已清理缓存")

await asyncio.sleep(60)

async def predict_and_prevent(self):
"""预测问题并提前防止"""
# 分析历史数据
forecast = self.predict_traffic_spike()
if forecast['high_probability']:
# 在问题发生前主动扩容
await self.prepare_resources(forecast['expected_load'])

4. 社交能力(Social Ability)

Agent 能够与其他 Agent、系统和用户进行沟通和协作。

多 Agent 协作:

1
2
3
Agent 1(数据分析)→ 共享数据
Agent 2(决策引擎)→ 共享知识库 & 资源
Agent 3(执行引擎)

Agent 的演进史

代数 时期 技术 特点
一代 1970s-1990s 规则引擎 IF-THEN 规则,静态决策
二代 2000s-2010s 机器学习 从数据学习,自适应决策
三代 2012-2022 深度学习 端到端学习,非结构化数据
四代 2023-现在 LLM Agent 通用推理,零样本学习,工具调用

Agent 的分类

按决策方式

  1. 反应式 Agent - 直接响应输入,无内部状态
  2. 带状态的 Agent - 维护内部状态,基于状态决策
  3. 规划式 Agent - 制定计划后执行,支持复杂多步骤任务

按学习能力

  1. 非学习 Agent - 规则固定,不改进
  2. 学习 Agent - 从数据学习,持续改进

按任务类型

  1. 信息收集 Agent - 获取和整理信息
  2. 决策制定 Agent - 做出最优决策
  3. 执行 Agent - 执行具体操作
  4. 监控 Agent - 持续监控系统状态

Agent vs 传统软件

特性 传统软件 AI Agent
编程方式 显式编码逻辑 高层意图描述
灵活性
新场景适应 需要重新编码 自动适应
学习能力
开发速度
运营成本 高(模型成本)

具体对比:文本分类

传统方式:手工编写所有规则,新增分类需要修改代码

Agent 方式

1
直接描述任务 → 模型自动理解和分类 → 自动学习新分类

Agent 应用场景

场景 1:客户服务

优势

  • 24/7 自动响应
  • 常见问题自动解决
  • 毫秒级响应
  • 成本大幅降低

场景 2:内容创作

优势

  • 快速生成初稿
  • 多角度内容生成
  • 可快速迭代
  • 成本低

场景 3:数据分析

优势

  • 非技术人员可用
  • 即时获得洞察
  • 自动生成报告

场景 4:流程自动化

优势

  • 自动审批工作流
  • 无需人工干预
  • 提高效率

第二部分:Agent 开发框架(ADK)

ADK 框架概览

Google 提供的 ADK(Agent Development Kit)是一个企业级的 Agent 开发工具包,采用四层架构:

1
2
3
4
5
6
7
8
9
10
11
12
13
┌──────────────────────────────────┐
│ 应用层:业务逻辑实现 │
├──────────────────────────────────┤
│ 编排层:任务流程和工作流 │
├──────────────────────────────────┤
│ 能力层:LLM、工具、存储、集成 │
│ ├─ LLM 推理引擎 │
│ ├─ 工具管理系统 │
│ ├─ 记忆存储系统 │
│ └─ 外部集成接口 │
├──────────────────────────────────┤
│ 基础设施层:计算、存储、监控、安全 │
└──────────────────────────────────┘

架构设计原则

  1. 分层设计 - 各层职责明确,便于独立开发和测试
  2. 解耦设计 - 组件间松耦合,支持替换和升级
  3. 可扩展性 - 支持添加新工具、集成新模型、定制流程
  4. 可靠性 - 完善的错误处理、重试机制、降级策略

核心组件详解

1. LLM 推理引擎

功能

1
输入处理 → 提示构建 → 模型推理 → 输出解析 → 结果返回

关键能力

  • 提示工程
  • 函数调用
  • 上下文管理
  • 输出解析

完整实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from typing import Optional, List, Dict, Any
import asyncio
import time

class GeminiLLMEngine:
"""Gemini LLM 引擎实现"""

def __init__(self, api_key: str, model_name: str = "gemini-pro"):
self.api_key = api_key
self.model_name = model_name
self.client = self._init_client()
self.call_history = []

def _init_client(self):
"""初始化 Gemini 客户端"""
import google.generativeai as genai
genai.configure(api_key=self.api_key)
return genai.GenerativeModel(self.model_name)

async def generate(
self,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 2048,
tools: Optional[List[Dict]] = None
) -> str:
"""生成响应"""
try:
generation_config = {
'temperature': temperature,
'max_output_tokens': max_tokens,
}

response = await asyncio.to_thread(
self.client.generate_content,
prompt,
generation_config=generation_config
)

# 记录调用
self.call_history.append({
'prompt': prompt,
'response': response.text,
'timestamp': time.time()
})

return response.text

except Exception as e:
print(f"LLM 调用失败: {e}")
raise

async def generate_with_function_call(
self,
prompt: str,
tools: List[Dict]
) -> Dict[str, Any]:
"""生成响应并支持函数调用"""

response = await asyncio.to_thread(
self.client.generate_content,
prompt,
tools=[self._format_tool(tool) for tool in tools]
)

return self._parse_response(response, tools)

def _format_tool(self, tool: Dict) -> Dict:
"""格式化工具定义"""
return {
'type': 'function',
'function': {
'name': tool['name'],
'description': tool['description'],
'parameters': tool.get('parameters', {})
}
}

def _parse_response(self, response, tools: List[Dict]) -> Dict[str, Any]:
"""解析模型响应"""
if hasattr(response, 'function_calls') and response.function_calls:
function_calls = []
for call in response.function_calls:
function_calls.append({
'name': call.name,
'arguments': dict(call.args)
})

return {
'type': 'function_call',
'function_calls': function_calls,
'text': response.text
}
else:
return {
'type': 'text',
'text': response.text
}

模型选择标准

  • 推理能力
  • 成本效益比
  • 延迟要求
  • Token 上限

2. 工具管理系统

职责

  • 工具注册和发现
  • 工具调用执行
  • 结果处理
  • 错误恢复

完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from typing import Callable, Any, Dict, List
from dataclasses import dataclass
import inspect
import time

@dataclass
class ToolDefinition:
"""工具定义"""
name: str
description: str
func: Callable
parameters: Dict[str, Any]
required_params: List[str]


class ToolRegistry:
"""工具注册表"""

def __init__(self):
self.tools: Dict[str, ToolDefinition] = {}

def register_tool(
self,
name: str,
description: str,
required_params: List[str] = None
):
"""装饰器:注册工具"""
def decorator(func: Callable):
sig = inspect.signature(func)
parameters = {}

for param_name, param in sig.parameters.items():
if param_name == 'self':
continue

param_type = param.annotation
if param_type == inspect.Parameter.empty:
param_type = 'any'

parameters[param_name] = {
'type': str(param_type),
'required': param_name in (required_params or [])
}

tool_def = ToolDefinition(
name=name,
description=description,
func=func,
parameters=parameters,
required_params=required_params or []
)

self.tools[name] = tool_def
return func

return decorator

async def call_tool(self, name: str, **kwargs) -> Any:
"""调用工具"""
tool_def = self.tools[name]

# 验证参数
missing_params = [
p for p in tool_def.required_params
if p not in kwargs
]
if missing_params:
raise ValueError(f"缺少必需参数: {missing_params}")

# 调用工具函数
func = tool_def.func
if inspect.iscoroutinefunction(func):
return await func(**kwargs)
else:
return func(**kwargs)


class ToolExecutor:
"""工具执行器 - 负责工具的安全执行"""

def __init__(self, registry: ToolRegistry, timeout: int = 30):
self.registry = registry
self.timeout = timeout
self.execution_log = []

async def execute_tool_with_retry(
self,
tool_name: str,
max_retries: int = 3,
backoff_factor: float = 2.0,
**kwargs
) -> Dict[str, Any]:
"""执行工具,支持重试"""

for attempt in range(max_retries):
try:
result = await asyncio.wait_for(
self.registry.call_tool(tool_name, **kwargs),
timeout=self.timeout
)

self.execution_log.append({
'tool': tool_name,
'status': 'success',
'attempt': attempt + 1,
'result': result,
'timestamp': time.time()
})

return {'success': True, 'result': result}

except asyncio.TimeoutError:
print(f"工具 {tool_name} 执行超时 (尝试 {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
await asyncio.sleep(backoff_factor ** attempt)
else:
return {
'success': False,
'error': f'工具执行超时(已重试 {max_retries} 次)'
}

except Exception as e:
print(f"工具 {tool_name} 执行失败: {e}")
self.execution_log.append({
'tool': tool_name,
'status': 'error',
'attempt': attempt + 1,
'error': str(e),
'timestamp': time.time()
})

if attempt < max_retries - 1:
await asyncio.sleep(backoff_factor ** attempt)
else:
return {
'success': False,
'error': str(e),
'attempts': max_retries
}


# 使用示例
class OrderManagementTools:
"""订单管理工具集"""

def __init__(self, registry: ToolRegistry):
self.registry = registry

def setup_tools(self):
"""设置所有工具"""
self.registry.register_tool(
name='get_order_status',
description='获取订单状态',
required_params=['order_id']
)(self.get_order_status)

def get_order_status(self, order_id: str) -> Dict:
"""获取订单状态"""
# 模拟数据库查询
return {
'order_id': order_id,
'status': 'paid',
'items': ['商品1', '商品2'],
'total': 199.99
}

工具种类

  • API 集成
  • 数据库查询
  • 文件操作
  • 第三方服务

关键特性

  • 工具定义标准化
  • 参数验证
  • 权限检查
  • 执行重试

3. 内存系统

分层设计

内存类型 特点 用途
会话记忆 短期,内存存储 当前对话上下文
持久化记忆 长期,数据库存储 用户档案、历史记录
知识库 向量存储 业务知识、常见问题

完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class Message:
"""消息记录"""
role: str # 'user' 或 'assistant'
content: str
timestamp: datetime
metadata: Dict = None


class SessionMemory:
"""会话(短期)记忆"""

def __init__(self, session_id: str, max_size: int = 100):
self.session_id = session_id
self.messages: List[Message] = []
self.max_size = max_size
self.created_at = datetime.now()

def add_message(self, role: str, content: str, metadata: Dict = None):
"""添加消息"""
message = Message(
role=role,
content=content,
timestamp=datetime.now(),
metadata=metadata or {}
)

self.messages.append(message)

# 如果超过最大数量,删除最早的消息
if len(self.messages) > self.max_size:
self.messages.pop(0)

def get_context(self, last_n: int = 10) -> str:
"""获取最后 N 条消息的上下文"""
recent_messages = self.messages[-last_n:]

context = ""
for msg in recent_messages:
if msg.role == 'user':
context += f"用户: {msg.content}\n"
else:
context += f"Assistant: {msg.content}\n"

return context

def to_dict(self) -> Dict:
"""转换为字典(用于序列化)"""
return {
'session_id': self.session_id,
'created_at': self.created_at.isoformat(),
'messages': [
{
'role': msg.role,
'content': msg.content,
'timestamp': msg.timestamp.isoformat(),
'metadata': msg.metadata
}
for msg in self.messages
]
}


class KnowledgeBase:
"""知识库"""

def __init__(self):
self.documents = {}
self.embedder = self._init_embedder()

def _init_embedder(self):
"""初始化文本嵌入模型"""
return None # 实际应该使用真实的嵌入模型

async def add_document(
self,
doc_id: str,
content: str,
metadata: Dict = None
):
"""添加文档到知识库"""
self.documents[doc_id] = {
'content': content,
'created_at': datetime.now().isoformat(),
'metadata': metadata or {}
}

async def search(
self,
query: str,
top_k: int = 5
) -> List[Dict]:
"""搜索相关文档"""
# 简单的文本匹配搜索(实际应该用向量相似度)
results = []
for doc_id, doc in self.documents.items():
if query.lower() in doc['content'].lower():
results.append({
'doc_id': doc_id,
'content': doc['content'],
'score': 0.8
})

return results[:top_k]

实现方案

1
会话记忆(内存) → 持久化记忆(DB) → 知识库(向量DB)

4. 任务编排引擎

功能

  • 工作流定义
  • 任务分配
  • 状态管理
  • 依赖处理

完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from enum import Enum
from typing import List, Dict, Callable, Optional, Any

class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
RETRYING = "retrying"


class Task:
"""任务定义"""
def __init__(
self,
task_id: str,
name: str,
action: str,
parameters: Dict,
dependencies: List[str] = None
):
self.task_id = task_id
self.name = name
self.action = action
self.parameters = parameters
self.dependencies = dependencies or []


class WorkflowEngine:
"""工作流引擎"""

def __init__(self):
self.tasks: Dict[str, Task] = {}
self.task_status: Dict[str, TaskStatus] = {}
self.task_results: Dict[str, Any] = {}
self.execution_log = []

def define_task(
self,
task_id: str,
name: str,
action: str,
parameters: Dict,
dependencies: List[str] = None
) -> Task:
"""定义任务"""
task = Task(
task_id=task_id,
name=name,
action=action,
parameters=parameters,
dependencies=dependencies or []
)
self.tasks[task_id] = task
self.task_status[task_id] = TaskStatus.PENDING
return task

async def execute_workflow(self) -> Dict[str, Any]:
"""执行工作流"""

# 拓扑排序:确定执行顺序
execution_order = self._topological_sort()

# 执行任务
for task_id in execution_order:
task = self.tasks[task_id]

# 检查依赖是否完成
if not self._check_dependencies(task):
self.task_status[task_id] = TaskStatus.FAILED
self.execution_log.append({
'task_id': task_id,
'status': 'failed',
'reason': '依赖任务失败'
})
continue

# 执行任务
try:
self.task_status[task_id] = TaskStatus.RUNNING

result = await self._execute_task(task)

self.task_status[task_id] = TaskStatus.SUCCESS
self.task_results[task_id] = result

self.execution_log.append({
'task_id': task_id,
'status': 'success',
'result': result
})

except Exception as e:
self.task_status[task_id] = TaskStatus.FAILED
self.execution_log.append({
'task_id': task_id,
'status': 'failed',
'error': str(e)
})

return {
'status': self._get_workflow_status(),
'results': self.task_results,
'log': self.execution_log
}

def _topological_sort(self) -> List[str]:
"""拓扑排序:按依赖关系排序任务"""
visited = set()
stack = []

def visit(task_id):
if task_id in visited:
return
visited.add(task_id)

task = self.tasks[task_id]
for dep in task.dependencies:
visit(dep)

stack.append(task_id)

for task_id in self.tasks:
visit(task_id)

return stack

def _check_dependencies(self, task: Task) -> bool:
"""检查任务依赖是否全部完成"""
for dep_id in task.dependencies:
if self.task_status.get(dep_id) != TaskStatus.SUCCESS:
return False
return True

async def _execute_task(self, task: Task) -> Any:
"""执行单个任务"""
print(f"执行任务: {task.name}")
await asyncio.sleep(1) # 模拟执行
return f"{task.name} 执行完成"

def _get_workflow_status(self) -> str:
"""获取工作流状态"""
statuses = set(self.task_status.values())
if TaskStatus.FAILED in statuses:
return 'failed'
elif TaskStatus.RUNNING in statuses:
return 'running'
else:
return 'success'


# 使用示例
async def example_workflow():
engine = WorkflowEngine()

# 定义工作流
engine.define_task('task1', '数据收集', 'collect_data', {})
engine.define_task(
'task2',
'数据验证',
'validate_data',
{},
dependencies=['task1']
)
engine.define_task(
'task3',
'数据分析',
'analyze_data',
{},
dependencies=['task2']
)

# 执行工作流
result = await engine.execute_workflow()
print("工作流执行结果:", result)

流程

1
定义任务 → 构建依赖关系 → 拓扑排序 → 按序执行 → 结果聚合

提示工程最佳实践

提示模板结构

1
2
3
4
5
6
1. 角色定义 - "你是一个..."
2. 任务描述 - "你需要..."
3. 上下文信息 - "背景是..."
4. 输入数据 - "以下是..."
5. 输出格式 - "请以...格式输出"
6. 约束条件 - "注意事项"

优化技巧

  • 明确性 - 详细说明任务要求
  • 结构化 - 使用清晰的格式和段落
  • 示例 - 提供具体示例
  • 约束 - 设置明确的限制条件

工具系统设计

工具定义规范

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"name": "get_order_status",
"description": "获取订单状态和详情",
"parameters": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "订单号"
}
},
"required": ["order_id"]
}
}

工具执行流程

1
2
3
4
5
1. 参数验证
2. 权限检查
3. 执行工具
4. 结果处理
5. 错误处理

错误处理策略

  • 重试机制 - 指数退避重试
  • 降级方案 - 使用备选方案
  • 超时控制 - 设置合理超时
  • 日志记录 - 详细记录所有操作

内存系统实现

会话记忆

存储当前对话的消息和上下文。

1
2
特点:易失性、快速、有限容量
生命周期:会话创建到会话结束

持久化记忆

存储用户档案、历史对话、学习结果。

1
2
特点:持久性、查询灵活、无容量限制
数据库选择:PostgreSQL、MongoDB 等

知识库

使用向量数据库存储企业知识。

1
2
特点:高维向量、相似度搜索、快速检索
使用场景:FAQ、文档检索、知识问答

工作流编排

工作流定义

1
2
3
4
5
6
7
8
# 定义任务依赖关系
task1 (数据收集)

task2 (数据验证)

task3 (数据分析)

task4 (报告生成)

拓扑排序

确保任务按正确的依赖关系执行。

并行执行

当任务无依赖关系时,可并行执行以提高效率。

故障处理

  • 重试机制
  • 回滚策略
  • 部分成功处理

第三部分:生产部署和最佳实践

生产级 Agent 架构

完整架构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
┌──────────────┐
│ 用户层 │ Web、App、API、集成
└──────┬───────┘

┌──────▼──────────────────────────┐
│ 网关层(Gateway) │
│ 认证、授权、限流、请求验证 │
└──────┬──────────────┬────────────┘
│ │
┌──────▼───────┐ ┌──▼──────────┐
│ Agent 编排层 │ │ 缓存层 │
│ 负载均衡 │ │ Redis │
│ Agent 管理 │ │ Session │
└──────┬───────┘ └─────────────┘

┌──────▼────────────────────────────┐
│ Agent 处理层 │
│ Agent 1、Agent 2、...、Agent N │
└──────┬────────────────────────────┘

┌──────▼────────────────────────────┐
│ 能力层 │
│ 工具库、内存系统、安全服务 │
└──────┬────────────────────────────┘

┌──────▼────────────────────────────┐
│ 监控层 │
│ 日志、指标、链路追踪、告警 │
└───────────────────────────────────┘

高可用设计

关键特性

  • 多实例部署
  • 自动故障转移
  • 健康检查
  • 自动恢复

完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
class HighAvailabilityAgent:
"""高可用 Agent 实现"""

def __init__(self, config: Dict):
self.config = config
self.instance_id = self._generate_instance_id()
self.is_healthy = True
self.metrics = {
'requests_processed': 0,
'requests_failed': 0,
'average_latency': 0
}

async def process_request(
self,
request: Dict[str, Any],
timeout: int = 30,
max_retries: int = 3
) -> Dict[str, Any]:
"""处理请求"""

start_time = time.time()
attempt = 0
last_error = None

while attempt < max_retries:
try:
if not self.is_healthy:
await self.recover()

# 执行请求
result = await asyncio.wait_for(
self._process_request_internal(request),
timeout=timeout
)

# 更新指标
latency = time.time() - start_time
self.metrics['requests_processed'] += 1
self.metrics['average_latency'] = (
(self.metrics['average_latency'] * (self.metrics['requests_processed'] - 1) +
latency) / self.metrics['requests_processed']
)

return {
'success': True,
'data': result,
'latency': latency
}

except asyncio.TimeoutError:
attempt += 1
last_error = f"请求超时(尝试 {attempt}/{max_retries})"
await asyncio.sleep(2 ** attempt)

except Exception as e:
attempt += 1
last_error = str(e)
await asyncio.sleep(2 ** attempt)

# 所有重试都失败了
self.metrics['requests_failed'] += 1
self.is_healthy = False

return {
'success': False,
'error': last_error,
'attempts': max_retries
}

async def health_check(self) -> bool:
"""健康检查"""
try:
# 检查各个组件的健康状态
self.is_healthy = await self._check_all_components()
return self.is_healthy

except Exception as e:
print(f"健康检查失败: {e}")
self.is_healthy = False
return False

async def recover(self):
"""故障恢复"""
print(f"开始恢复 Agent {self.instance_id}")
try:
await self._reinit_connections()
if await self.health_check():
print(f"Agent {self.instance_id} 恢复成功")
else:
print(f"Agent {self.instance_id} 恢复失败")
except Exception as e:
print(f"恢复过程出错: {e}")

def _generate_instance_id(self) -> str:
"""生成实例 ID"""
import uuid
return str(uuid.uuid4())

async def _process_request_internal(self, request: Dict) -> Any:
"""实际处理逻辑"""
pass

async def _check_all_components(self) -> bool:
"""检查所有组件"""
return True

async def _reinit_connections(self):
"""重新初始化连接"""
pass

故障转移流程

1
主 Agent 失败 → 健康检查失败 → 流量转移到备用 Agent → 自动恢复

可观测性和监控

关键指标

1. 延迟指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class MetricsCollector:
"""指标收集器"""

def __init__(self):
self.metrics: Dict[str, List[float]] = {}
self.thresholds = {
'latency_p99': 5000, # 毫秒
'error_rate': 1, # 百分比
'token_usage_ratio': 0.8
}

def record_metric(self, name: str, value: float, tags: Dict = None):
"""记录指标"""
if name not in self.metrics:
self.metrics[name] = []
self.metrics[name].append(value)

def get_percentile(self, metric_name: str, percentile: int) -> float:
"""获取指标的百分位数"""
if metric_name not in self.metrics:
return 0

values = sorted(self.metrics[metric_name])
index = int(len(values) * (percentile / 100))
return values[index] if index < len(values) else 0

def check_threshold_violations(self) -> List[Dict]:
"""检查阈值违规"""
violations = []

for metric_name, threshold in self.thresholds.items():
if metric_name in self.metrics:
latest_value = self.metrics[metric_name][-1]

if latest_value > threshold:
violations.append({
'metric': metric_name,
'threshold': threshold,
'current_value': latest_value,
'severity': self._calculate_severity(
latest_value, threshold
)
})

return violations

def _calculate_severity(self, value: float, threshold: float) -> str:
"""计算严重程度"""
ratio = value / threshold
if ratio > 1.5:
return 'critical'
elif ratio > 1.2:
return 'high'
else:
return 'warning'
  • P50、P95、P99 响应时间
  • 各组件耗时分析

2. 可靠性指标

  • 错误率
  • 成功率
  • 重试率

3. 资源指标

  • CPU 使用率
  • 内存使用率
  • Token 使用量

4. 业务指标

  • 请求数
  • 转化率
  • 用户满意度

监控方案

实时监控

1
指标收集 → 聚合 → 分析 → 告警 → 自动响应

链路追踪

1
2
3
请求 → 网关 → Agent → LLM → 工具 → 响应
↓ ↓ ↓ ↓ ↓
时间 时间 时间 时间 时间

告警策略

  • 阈值告警 - 指标超过阈值
  • 异常检测 - 突发变化告警
  • 关联告警 - 多指标关联分析

安全和隐私

认证和授权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import hashlib
import hmac
from datetime import datetime

class SecurityManager:
"""安全管理器"""

def __init__(self):
self.audit_log = []
self.rate_limiters = {}
self.blocked_users = set()

def authenticate_request(self, token: str) -> Dict[str, Any]:
"""认证请求"""
try:
# 验证 token 签名
payload = self._verify_token(token)

# 检查 token 是否过期
if payload['exp'] < datetime.now().timestamp():
return {'authenticated': False, 'error': 'token已过期'}

# 检查用户是否被限制
user_id = payload['user_id']
if user_id in self.blocked_users:
return {'authenticated': False, 'error': '用户已被限制'}

return {'authenticated': True, 'user_id': user_id}

except Exception as e:
return {'authenticated': False, 'error': str(e)}

def authorize_action(
self,
user_id: str,
resource: str,
action: str
) -> bool:
"""授权检查"""
# 检查用户是否有权限执行此操作
if action == 'delete_user':
user_role = self._get_user_role(user_id)
return user_role == 'admin'

return True

def rate_limit_check(self, user_id: str, limit: int = 100) -> bool:
"""速率限制检查"""
now = time.time()

if user_id not in self.rate_limiters:
self.rate_limiters[user_id] = []

# 清理一分钟外的请求
self.rate_limiters[user_id] = [
req_time for req_time in self.rate_limiters[user_id]
if now - req_time < 60
]

# 检查是否超过限制
if len(self.rate_limiters[user_id]) >= limit:
return False

# 记录新请求
self.rate_limiters[user_id].append(now)
return True

def log_audit_event(
self,
user_id: str,
action: str,
resource: str,
details: Dict = None
):
"""记录审计日志"""
self.audit_log.append({
'timestamp': datetime.now().isoformat(),
'user_id': user_id,
'action': action,
'resource': resource,
'details': details or {}
})

def _verify_token(self, token: str) -> Dict:
"""验证 token"""
# 实现 JWT 验证逻辑
pass

def _get_user_role(self, user_id: str) -> str:
"""获取用户角色"""
pass
1
2
3
1. 认证 - 确认用户身份(Token、OAuth)
2. 授权 - 检查用户权限(基于角色、基于资源)
3. 审计 - 记录所有操作

数据安全

  • 加密 - 传输层和存储层加密
  • PII 检测 - 检测个人可识别信息
  • 匿名化 - 处理敏感数据

隐私合规

  • GDPR - 数据保留、删除权利
  • CCPA - 数据透明性
  • 本地法规 - 依据地区法规处理

性能优化

缓存策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from functools import wraps
import json

class CacheManager:
"""缓存管理器"""

def __init__(self):
self.memory_cache = {}
self.cache_stats = {
'hits': 0,
'misses': 0,
'evictions': 0
}
self.max_memory = 100 * 1024 * 1024 # 100MB

def get(self, key: str):
"""获取缓存"""
if key in self.memory_cache:
self.cache_stats['hits'] += 1
return self.memory_cache[key]['value']

self.cache_stats['misses'] += 1
return None

def set(self, key: str, value: Any, ttl: int = 3600):
"""设置缓存"""
self.memory_cache[key] = {
'value': value,
'expiry': time.time() + ttl,
'access_count': 0,
'last_accessed': time.time()
}

def cache_result(self, ttl: int = 3600):
"""缓存装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = self._generate_cache_key(func.__name__, args, kwargs)

# 尝试从缓存获取
cached_value = self.get(cache_key)
if cached_value is not None:
return cached_value

# 执行函数
result = await func(*args, **kwargs)

# 存储到缓存
self.set(cache_key, result, ttl)

return result

return wrapper
return decorator

def _generate_cache_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""生成缓存键"""
key_data = {
'func': func_name,
'args': args,
'kwargs': kwargs
}

key_str = json.dumps(key_data, sort_keys=True)
return hashlib.md5(key_str.encode()).hexdigest()

三层缓存

1
2
3
1. 请求缓存 - 相同请求的结果
2. 计算缓存 - 中间结果缓存
3. 知识缓存 - 知识库缓存

缓存失效策略

  • TTL 过期
  • LRU 淘汰
  • 主动更新

异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class AsyncTaskQueue:
"""异步任务队列"""

def __init__(self, max_workers: int = 10):
self.queue = asyncio.Queue()
self.max_workers = max_workers
self.workers = []
self.results = {}

async def start(self):
"""启动工作线程"""
self.workers = [
asyncio.create_task(self._worker())
for _ in range(self.max_workers)
]

async def add_task(self, task_id: str, task: Callable, *args, **kwargs):
"""添加任务"""
await self.queue.put({
'task_id': task_id,
'task': task,
'args': args,
'kwargs': kwargs
})

async def _worker(self):
"""工作线程"""
while True:
task_item = await self.queue.get()

try:
result = await task_item['task'](
*task_item['args'],
**task_item['kwargs']
)

self.results[task_item['task_id']] = {
'status': 'success',
'result': result
}

except Exception as e:
self.results[task_item['task_id']] = {
'status': 'failed',
'error': str(e)
}

finally:
self.queue.task_done()
1
2
3
4
5
同步操作:请求 → 处理 → 响应(等待时间长)

异步操作:请求 → 加入队列 → 立即响应

后台处理 → 结果回调

Token 优化

  • 提示压缩 - 删除不必要内容
  • 缓存复用 - 缓存常见提示
  • 模型选择 - 选择高效的模型

部署策略

分阶段部署

金丝雀部署

1
2
3
开发 → 测试 → 灰度(10%) → 全量(100%)

监控指标 → 自动回滚

Blue-Green 部署

1
2
3
4
5
Blue(当前)← 流量 → Green(新版本)

完整测试

流量切换

部署检查清单

  • 功能测试通过
  • 性能指标达标
  • 安全审查完成
  • 文档已更新
  • 灾备方案已验证

故障处理和恢复

故障转移

1
检测故障 → 标记不健康 → 流量转移 → 恢复尝试 → 健康检查

恢复流程

  1. 自动恢复 - 清理状态、重新初始化
  2. 手动干预 - 人工确认后恢复
  3. 部分恢复 - 关闭故障模块,其他继续运行

降级策略

1
2
全功能 → 核心功能 → 只读模式 → 离线模式
(错误率高) (故障严重) (完全不可用)

成本优化

Token 成本管理

1
监控 → 优化提示 → 使用缓存 → 批量处理 → 模型选择

成本控制

  • 设置月度预算
  • 监控实时消费
  • 自动告警
  • 触发降级

基础设施成本

  • 使用按需付费
  • 自动扩缩容
  • 资源共享
  • 选择成本效益最优方案

第四部分:实战案例分析

电商客服 Agent 完整实现

功能设计

1
2
3
4
5
6
7
8
9
10
11
用户询问

意图理解(LLM)

↙ ↓ ↘
常见问题 需要操作 需要人工
│ │ │
├→缓存回复 ├→调用工具 ├→转接客服
└→生成回复

返回结果

完整代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class EcommerceCustomerServiceAgent:
"""电商客服 Agent"""

def __init__(self):
self.llm = GeminiLLMEngine(api_key="your_api_key")
self.tools = OrderManagementTools(ToolRegistry())
self.memory = {
'session': SessionMemory('default'),
'persistent': None # 连接真实数据库
}
self.security = SecurityManager()
self.metrics = MetricsCollector()
self.cache = CacheManager()

async def handle_customer_inquiry(
self,
user_id: str,
inquiry: str
) -> str:
"""处理客户询问"""

start_time = time.time()

try:
# 认证
auth_result = self.security.authenticate_request(f"user:{user_id}")
if not auth_result['authenticated']:
return "认证失败"

# 速率限制
if not self.security.rate_limit_check(user_id):
return "请求过于频繁,请稍后再试"

# 清理输入
clean_inquiry = inquiry.strip()

# 检查缓存
cached_response = self.cache.get(f"inquiry:{clean_inquiry}")
if cached_response:
self.metrics.record_metric('cache_hit', 1)
return cached_response

# 生成回复
response = await self.llm.generate_with_function_call(
prompt=self._build_customer_prompt(clean_inquiry),
tools=self.tools.registry.get_tool_definitions()
)

# 处理工具调用
if response['type'] == 'function_call':
for call in response['function_calls']:
tool_result = await self.tools.executor.execute_tool_with_retry(
call['name'],
**call['arguments']
)
# 集成工具结果
response['text'] = await self._integrate_tool_result(
response['text'],
tool_result
)

final_response = response['text']

# 缓存响应
self.cache.set(f"inquiry:{clean_inquiry}", final_response)

# 保存到记忆
self.memory['session'].add_message('user', clean_inquiry)
self.memory['session'].add_message('assistant', final_response)

# 记录指标
latency = time.time() - start_time
self.metrics.record_metric('inquiry_latency', latency)
self.metrics.record_metric('successful_responses', 1)

# 审计日志
self.security.log_audit_event(
user_id=user_id,
action='customer_inquiry',
resource='customer_service',
details={'inquiry': clean_inquiry}
)

return final_response

except Exception as e:
self.metrics.record_metric('failed_responses', 1)
return f"处理请求时出错: {str(e)}"

def _build_customer_prompt(self, inquiry: str) -> str:
"""构建客户服务提示"""
context = self.memory['session'].get_context()

return f"""
你是一个专业的客服代表。

客户询问:{inquiry}

对话历史:
{context}

请提供一个有帮助、友好的回复。
如果需要调用工具(查询订单、处理退款等),请使用可用的工具。
"""

async def _integrate_tool_result(self, response: str, tool_result: Any) -> str:
"""集成工具结果到回复"""
integrated = await self.llm.generate(
prompt=f"""
原始回复:{response}
工具结果:{json.dumps(tool_result)}

请基于工具结果更新回复,使其更加具体和有用。
"""
)
return integrated


# 使用示例
async def main():
agent = EcommerceCustomerServiceAgent()

# 处理用户询问
response = await agent.handle_customer_inquiry(
user_id="user-123",
inquiry="我的订单什么时候发货?"
)

print("Agent 回复:", response)

工具定义

1
2
3
4
5
1. get_order_status - 获取订单状态
2. process_refund - 处理退款
3. check_inventory - 查询库存
4. create_ticket - 创建工单
5. send_notification - 发送通知

处理流程

  1. 用户输入处理

    • 清理输入
    • 检查缓存
    • 认证授权
  2. 意图理解

    • 调用 LLM 理解意图
    • 提取关键信息
    • 判断所需操作
  3. 工具调用

    • 确定所需工具
    • 参数验证
    • 执行工具
    • 结果处理
  4. 响应生成

    • 整合工具结果
    • 生成自然语言
    • 格式化输出
  5. 反馈保存

    • 更新会话记忆
    • 保存持久化数据
    • 记录审计日志

性能指标

  • 响应时间 P99 < 2s
  • 成功率 > 95%
  • 自动解决率 > 70%
  • 用户满意度 > 4.5/5

总结和展望

关键要点总结

AI Agent 是自主、智能的软件系统
ADK 四层架构提供了完整的开发框架
生产部署需要完善的架构设计
监控、安全、成本是生产系统的重要考量
持续优化是保持竞争力的关键

学习路径建议

1
2
3
4
5
6
7
8
9
10
11
12
13
1️⃣  理解 Agent 基本概念

2️⃣ 学习 ADK 架构和组件

3️⃣ 掌握提示工程和工具使用

4️⃣ 实现一个简单的 Agent

5️⃣ 优化性能和成本

6️⃣ 部署到生产环境

7️⃣ 监控和持续改进

下一步行动

立即可做

  • 阅读 Google 官方白皮书
  • 实验 LLM API(Gemini、GPT)
  • 尝试构建简单 Agent
  • 了解工具调用机制

短期目标(1-3个月)

  • 实现完整的 Agent 系统
  • 集成多个工具
  • 部署到测试环境
  • 进行性能测试

长期目标(3-12个月)

  • 优化和调优
  • 生产环境部署
  • 持续监控改进
  • 扩展到多个业务场景

关键资源

  • 官方文档 - Google AI Agents Guide
  • 框架选择 - Langchain、AutoGen、CrewAI
  • 模型选择 - Gemini、GPT-4、Claude
  • 向量数据库 - Pinecone、Weaviate、Milvus
  • 监控工具 - Datadog、New Relic、Prometheus

祝你在 AI Agent 开发中取得成功!这是一个充满机遇的新领域。


文章作者: RickDamon
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 RickDamon !
 本篇
基于Google最新Agents论文的学习文档 基于Google最新Agents论文的学习文档
导语原链接:Google AI Agents Technical Guide 这份白皮书是 Google 对AI Agent 从概念到生产部署的完整技术路线图,涵盖了: ✅ AI Agent 的定义和分类 ✅ Agent 开发框架(ADK
下一篇 
Golang1.22非兼容性更新踩坑记录 Golang1.22非兼容性更新踩坑记录
导语坑的内容主要是golang在1.22版本中更新了循环中的临时变量,每次循环为单独的地址,而在旧版本中,所有循环中的临时变量,指向的是同一块内存。说的更详细点:在 Go 1.22 之前的版本中,for _, p := range arr
2024-10-18
  目录