SDK Custom Tools Guide
版本要求:本文档针对 CodeBuddy Agent SDK v0.1.24 及以上版本。
功能状态:SDK Custom Tools 是 CodeBuddy Agent SDK 的一项 Preview 功能。
本文档介绍如何在 CodeBuddy Agent SDK 中创建和使用自定义工具。自定义工具允许你定义专属的功能,让 Agent 能够调用它们来完成特定任务。
概述
Custom Tools 是 CodeBuddy Agent SDK 提供的一种通过 MCP(Model Context Protocol)创建自定义工具的方式。与配置外部 MCP 服务器不同,Custom Tools 允许你直接在应用程序中定义工具,无需单独的进程或服务器。
核心优势
- 内进程执行:工具在应用程序内执行,无需创建独立进程
- 类型安全:支持 TypeScript 完整的类型检查和类型推断
- 简化部署:无需单独部署 MCP 服务器,一切随应用部署
- 紧密集成:与应用程序共享内存和状态
- 零额外依赖:利用现有的 SDK 基础设施
快速开始
TypeScript
创建一个简单的计算器工具:
typescript
import { query, createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
// 创建 MCP 服务器并定义工具
const calculatorServer = createSdkMcpServer('calculator', {
tools: [
tool({
name: 'add',
description: 'Add two numbers',
schema: z.object({
a: z.number().describe('First number'),
b: z.number().describe('Second number'),
}),
handler: async ({ a, b }) => {
return { result: a + b };
},
}),
tool({
name: 'multiply',
description: 'Multiply two numbers',
schema: z.object({
a: z.number().describe('First number'),
b: z.number().describe('Second number'),
}),
handler: async ({ a, b }) => {
return { result: a * b };
},
}),
],
});
// 在 SDK 中使用自定义工具
const result = query({
prompt: 'Calculate 15 + 27 and then multiply the result by 3',
options: {
mcpServers: {
'calculator': calculatorServer,
},
},
});
for await (const message of result) {
console.log(message);
}Python
Python SDK 使用装饰器模式定义工具:
python
from codebuddy_agent_sdk import query, create_sdk_mcp_server, tool
from typing import Any
# 定义工具
@tool(
"add",
"Add two numbers",
{"a": float, "b": float}
)
async def add(args: dict[str, Any]) -> dict[str, Any]:
return {'result': args['a'] + args['b']}
@tool(
"multiply",
"Multiply two numbers",
{"a": float, "b": float}
)
async def multiply(args: dict[str, Any]) -> dict[str, Any]:
return {'result': args['a'] * args['b']}
# 创建 MCP 服务器并注册工具
calculator_server = create_sdk_mcp_server(
name='calculator',
tools=[add, multiply]
)
# 在 SDK 中使用自定义工具
async def calculate():
result = query(
prompt='Calculate 15 + 27 and then multiply the result by 3',
options={
'mcp_servers': {
'calculator': calculator_server,
},
},
)
async for message in result:
print(message)
# 运行
import asyncio
asyncio.run(calculate())创建自定义工具
TypeScript - 基本工具定义
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const myServer = createSdkMcpServer('my-tools', {
tools: [
tool({
name: 'my_tool',
description: 'Description of what the tool does',
schema: z.object({
parameter1: z.string().describe('Description of parameter1'),
parameter2: z.number().optional().describe('Optional parameter'),
}),
handler: async (input) => {
// 实现工具逻辑
return {
result: 'Tool output',
details: input,
};
},
}),
],
});TypeScript - 完整示例:文件分析工具
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
import * as fs from 'fs/promises';
import * as path from 'path';
const fileAnalysisServer = createSdkMcpServer('file-analysis', {
tools: [
tool({
name: 'count_lines',
description: 'Count lines in a file',
schema: z.object({
filePath: z.string().describe('Path to the file'),
}),
handler: async ({ filePath }) => {
try {
const content = await fs.readFile(filePath, 'utf-8');
const lineCount = content.split('\n').length;
return {
success: true,
filePath,
lineCount,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'list_files',
description: 'List all files in a directory',
schema: z.object({
dirPath: z.string().describe('Path to the directory'),
pattern: z.string().optional().describe('Optional glob pattern'),
}),
handler: async ({ dirPath, pattern }) => {
try {
const files = await fs.readdir(dirPath);
let filtered = files;
if (pattern) {
const minimatch = require('minimatch').minimatch;
filtered = files.filter(f => minimatch(f, pattern));
}
return {
success: true,
dirPath,
files: filtered,
count: filtered.length,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'get_file_info',
description: 'Get information about a file',
schema: z.object({
filePath: z.string().describe('Path to the file'),
}),
handler: async ({ filePath }) => {
try {
const stats = await fs.stat(filePath);
return {
success: true,
filePath,
size: stats.size,
created: stats.birthtime,
modified: stats.mtime,
isDirectory: stats.isDirectory(),
isFile: stats.isFile(),
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
],
});
export default fileAnalysisServer;Python - 装饰器模式
python
from codebuddy_agent_sdk import create_sdk_mcp_server, tool
from typing import Any
import os
@tool(
"count_lines",
"Count lines in a file",
{"file_path": str}
)
async def count_lines(args: dict[str, Any]) -> dict[str, Any]:
try:
with open(args['file_path'], 'r') as f:
line_count = len(f.readlines())
return {
'success': True,
'file_path': args['file_path'],
'line_count': line_count,
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@tool(
"list_files",
"List all files in a directory",
{"dir_path": str, "pattern": str}
)
async def list_files(args: dict[str, Any]) -> dict[str, Any]:
try:
files = os.listdir(args['dir_path'])
pattern = args.get('pattern')
if pattern:
import fnmatch
files = [f for f in files if fnmatch.fnmatch(f, pattern)]
return {
'success': True,
'dir_path': args['dir_path'],
'files': files,
'count': len(files),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@tool(
"get_file_info",
"Get information about a file",
{"file_path": str}
)
async def get_file_info(args: dict[str, Any]) -> dict[str, Any]:
try:
file_path = args['file_path']
stat = os.stat(file_path)
return {
'success': True,
'file_path': file_path,
'size': stat.st_size,
'created': stat.st_ctime,
'modified': stat.st_mtime,
'is_file': os.path.isfile(file_path),
'is_dir': os.path.isdir(file_path),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
# 创建 MCP 服务器并注册工具
file_analysis_server = create_sdk_mcp_server(
name='file-analysis',
tools=[count_lines, list_files, get_file_info]
)多个工具管理
TypeScript
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const multiToolServer = createSdkMcpServer('multi-tools', {
tools: [
tool({
name: 'tool_one',
description: 'First tool',
schema: z.object({ input: z.string() }),
handler: async ({ input }) => ({ result: `Tool 1: ${input}` }),
}),
tool({
name: 'tool_two',
description: 'Second tool',
schema: z.object({ data: z.number() }),
handler: async ({ data }) => ({ result: `Tool 2: ${data * 2}` }),
}),
tool({
name: 'tool_three',
description: 'Third tool',
schema: z.object({
name: z.string(),
age: z.number().optional(),
}),
handler: async ({ name, age }) => ({
result: `Tool 3: ${name}, age ${age ?? 'unknown'}`,
}),
}),
],
});
// 在 SDK 中使用
const result = query({
prompt: 'Use all the available tools',
options: {
mcpServers: {
'multi-tools': multiToolServer,
},
},
});Python
python
from codebuddy_agent_sdk import create_sdk_mcp_server, tool
from typing import Any
@tool("tool_one", "First tool", {"input": str})
async def tool_one(args: dict[str, Any]) -> dict[str, Any]:
return {'result': f"Tool 1: {args['input']}"}
@tool("tool_two", "Second tool", {"data": int})
async def tool_two(args: dict[str, Any]) -> dict[str, Any]:
return {'result': f"Tool 2: {args['data'] * 2}"}
@tool("tool_three", "Third tool", {"name": str, "age": int})
async def tool_three(args: dict[str, Any]) -> dict[str, Any]:
age_str = args.get('age', 'unknown')
return {'result': f"Tool 3: {args['name']}, age {age_str}"}
# 创建 MCP 服务器并注册工具
server = create_sdk_mcp_server(
name='multi-tools',
tools=[tool_one, tool_two, tool_three]
)类型安全
TypeScript - 使用 Zod 模式
Zod 提供运行时类型验证和强大的类型推断:
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const dataProcessingServer = createSdkMcpServer('data-processing', {
tools: [
tool({
name: 'process_user_data',
description: 'Process and validate user data',
schema: z.object({
userId: z.number().int().positive().describe('User ID'),
email: z.string().email().describe('User email'),
tags: z.array(z.string()).describe('User tags'),
preferences: z.object({
notifications: z.boolean().default(true),
theme: z.enum(['light', 'dark', 'auto']).default('auto'),
}).optional(),
}),
handler: async (input) => {
// input 类型完全由 Zod schema 推断
// TypeScript 知道所有字段的类型
const result = {
userId: input.userId,
email: input.email,
tagCount: input.tags.length,
hasPreferences: !!input.preferences,
};
return result;
},
}),
],
});Python - 类型注解
Python SDK 使用 @tool 装饰器定义工具,支持简单类型映射或 JSON Schema:
python
from codebuddy_agent_sdk import create_sdk_mcp_server, tool
from typing import Any
# 使用 JSON Schema 进行高级验证
@tool(
"process_user_data",
"Process and validate user data",
{
"type": "object",
"properties": {
"user_id": {"type": "integer", "minimum": 1},
"email": {"type": "string", "format": "email"},
"tags": {"type": "array", "items": {"type": "string"}},
"notifications": {"type": "boolean", "default": True},
"theme": {"type": "string", "enum": ["light", "dark", "auto"], "default": "auto"}
},
"required": ["user_id", "email", "tags"]
}
)
async def process_user_data(args: dict[str, Any]) -> dict[str, Any]:
return {
'user_id': args['user_id'],
'email': args['email'],
'tag_count': len(args['tags']),
'theme': args.get('theme', 'auto'),
'notifications': args.get('notifications', True),
}
# 创建 MCP 服务器并注册工具
server = create_sdk_mcp_server(
name='data-processing',
tools=[process_user_data]
)完整示例:数据库查询工具
TypeScript
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
interface QueryResult {
rows: Record<string, any>[];
rowCount: number;
}
interface Database {
query(sql: string, params?: any[]): Promise<QueryResult>;
}
// 假设你已有数据库连接
const db: Database = new Database();
const databaseServer = createSdkMcpServer('database', {
tools: [
tool({
name: 'execute_query',
description: 'Execute a read-only SQL query',
schema: z.object({
sql: z.string().describe('SQL query to execute'),
params: z.array(z.any()).optional().describe('Query parameters'),
}),
handler: async ({ sql, params }) => {
try {
// 防止危险操作
const upperSql = sql.toUpperCase();
if (
upperSql.includes('DROP') ||
upperSql.includes('DELETE') ||
upperSql.includes('UPDATE') ||
upperSql.includes('INSERT')
) {
return {
success: false,
error: 'Only SELECT queries are allowed',
};
}
const result = await db.query(sql, params);
return {
success: true,
rows: result.rows,
rowCount: result.rowCount,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Query execution failed',
};
}
},
}),
tool({
name: 'get_table_schema',
description: 'Get the schema of a table',
schema: z.object({
tableName: z.string().describe('Name of the table'),
}),
handler: async ({ tableName }) => {
try {
const result = await db.query(
`SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1`,
[tableName]
);
return {
success: true,
tableName,
columns: result.rows,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Schema retrieval failed',
};
}
},
}),
],
});Python
python
from codebuddy_agent_sdk import create_sdk_mcp_server, tool
from typing import Any
class Database:
"""Simplified database wrapper"""
async def query(self, sql: str, params: list[Any] = None) -> dict[str, Any]:
# 实现实际的数据库查询
pass
db = Database()
@tool(
"execute_query",
"Execute a read-only SQL query",
{"sql": str, "params": list}
)
async def execute_query(args: dict[str, Any]) -> dict[str, Any]:
try:
sql = args['sql']
params = args.get('params')
# 防止危险操作
dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT']
if any(keyword in sql.upper() for keyword in dangerous_keywords):
return {
'success': False,
'error': 'Only SELECT queries are allowed',
}
result = await db.query(sql, params)
return {
'success': True,
'rows': result.get('rows', []),
'row_count': result.get('row_count', 0),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@tool(
"get_table_schema",
"Get the schema of a table",
{"table_name": str}
)
async def get_table_schema(args: dict[str, Any]) -> dict[str, Any]:
try:
table_name = args['table_name']
result = await db.query(
'SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s',
[table_name]
)
return {
'success': True,
'table_name': table_name,
'columns': result.get('rows', []),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
# 创建 MCP 服务器并注册工具
server = create_sdk_mcp_server(
name='database',
tools=[execute_query, get_table_schema]
)完整示例:API 集成工具
TypeScript
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const apiGatewayServer = createSdkMcpServer('api-gateway', {
tools: [
tool({
name: 'stripe_create_payment',
description: 'Create a payment through Stripe',
schema: z.object({
amount: z.number().positive().describe('Amount in cents'),
currency: z.string().default('usd').describe('Currency code'),
description: z.string().optional().describe('Payment description'),
}),
handler: async ({ amount, currency, description }) => {
try {
const response = await fetch('https://api.stripe.com/v1/payment_intents', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.STRIPE_API_KEY}`,
'Content-Type': 'application/x-www-form-urlencoded',
},
body: new URLSearchParams({
amount: amount.toString(),
currency,
...(description && { description }),
}),
});
if (!response.ok) {
const error = await response.json();
return {
success: false,
error: error.error?.message || 'Payment creation failed',
};
}
const data = await response.json();
return {
success: true,
paymentId: data.id,
status: data.status,
clientSecret: data.client_secret,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'github_search_repos',
description: 'Search repositories on GitHub',
schema: z.object({
query: z.string().describe('Search query'),
language: z.string().optional().describe('Programming language'),
sort: z.enum(['stars', 'forks', 'updated']).default('stars'),
}),
handler: async ({ query, language, sort }) => {
try {
const searchQuery = language
? `${query} language:${language}`
: query;
const response = await fetch(
`https://api.github.com/search/repositories?q=${encodeURIComponent(
searchQuery
)}&sort=${sort}`,
{
headers: {
'Authorization': `Bearer ${process.env.GITHUB_TOKEN}`,
},
}
);
if (!response.ok) {
return {
success: false,
error: `GitHub API error: ${response.status}`,
};
}
const data = await response.json();
return {
success: true,
repos: data.items.map((repo: any) => ({
name: repo.name,
url: repo.html_url,
stars: repo.stargazers_count,
language: repo.language,
description: repo.description,
})),
total: data.total_count,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'slack_send_message',
description: 'Send a message to a Slack channel',
schema: z.object({
channel: z.string().describe('Channel ID or name'),
text: z.string().describe('Message text'),
thread_ts: z.string().optional().describe('Thread timestamp (for replies)'),
}),
handler: async ({ channel, text, thread_ts }) => {
try {
const payload: Record<string, any> = {
channel,
text,
};
if (thread_ts) {
payload.thread_ts = thread_ts;
}
const response = await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.SLACK_BOT_TOKEN}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
});
const data = await response.json();
if (!data.ok) {
return {
success: false,
error: data.error || 'Failed to send message',
};
}
return {
success: true,
messageTs: data.ts,
channel: data.channel,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
],
});
export default apiGatewayServer;Python
python
from codebuddy_agent_sdk import create_sdk_mcp_server, tool
from typing import Any
import requests
import os
@tool(
"stripe_create_payment",
"Create a payment through Stripe",
{"amount": float, "currency": str, "description": str}
)
async def stripe_create_payment(args: dict[str, Any]) -> dict[str, Any]:
try:
headers = {
'Authorization': f"Bearer {os.environ.get('STRIPE_API_KEY')}",
}
data = {
'amount': int(args['amount']),
'currency': args.get('currency', 'usd'),
}
description = args.get('description')
if description:
data['description'] = description
response = requests.post(
'https://api.stripe.com/v1/payment_intents',
headers=headers,
data=data,
)
if response.status_code >= 400:
error = response.json().get('error', {})
return {
'success': False,
'error': error.get('message', 'Payment creation failed'),
}
resp_data = response.json()
return {
'success': True,
'payment_id': resp_data['id'],
'status': resp_data['status'],
'client_secret': resp_data.get('client_secret'),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@tool(
"github_search_repos",
"Search repositories on GitHub",
{"query": str, "language": str, "sort": str}
)
async def github_search_repos(args: dict[str, Any]) -> dict[str, Any]:
try:
query = args['query']
language = args.get('language')
sort = args.get('sort', 'stars')
search_query = f"{query} language:{language}" if language else query
response = requests.get(
'https://api.github.com/search/repositories',
params={
'q': search_query,
'sort': sort,
},
headers={
'Authorization': f"Bearer {os.environ.get('GITHUB_TOKEN')}",
},
)
if response.status_code >= 400:
return {
'success': False,
'error': f"GitHub API error: {response.status_code}",
}
data = response.json()
repos = [
{
'name': repo['name'],
'url': repo['html_url'],
'stars': repo['stargazers_count'],
'language': repo['language'],
'description': repo['description'],
}
for repo in data.get('items', [])
]
return {
'success': True,
'repos': repos,
'total': data.get('total_count', 0),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@tool(
"slack_send_message",
"Send a message to a Slack channel",
{"channel": str, "text": str, "thread_ts": str}
)
async def slack_send_message(args: dict[str, Any]) -> dict[str, Any]:
try:
payload = {
'channel': args['channel'],
'text': args['text'],
}
thread_ts = args.get('thread_ts')
if thread_ts:
payload['thread_ts'] = thread_ts
response = requests.post(
'https://slack.com/api/chat.postMessage',
headers={
'Authorization': f"Bearer {os.environ.get('SLACK_BOT_TOKEN')}",
'Content-Type': 'application/json',
},
json=payload,
)
data = response.json()
if not data.get('ok'):
return {
'success': False,
'error': data.get('error', 'Failed to send message'),
}
return {
'success': True,
'message_ts': data['ts'],
'channel': data['channel'],
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
# 创建 MCP 服务器并注册工具
server = create_sdk_mcp_server(
name='api-gateway',
tools=[stripe_create_payment, github_search_repos, slack_send_message]
)选择性地允许工具
你可以选择性地允许特定工具被调用:
TypeScript
typescript
import { query, createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const result = query({
prompt: 'Search for popular repositories and send a message to Slack',
options: {
mcpServers: {
'api-gateway': apiGatewayServer,
},
canUseTool: (toolCall) => {
// 只允许 GitHub 搜索工具
const allowedTools = [
'mcp__api-gateway__github_search_repos',
];
if (!allowedTools.includes(toolCall.name)) {
return false;
}
return true;
},
},
});Python
python
from codebuddy_agent_sdk import query
from api_gateway_server import server as api_gateway_server
async def main():
result = query(
prompt='Search for popular repositories and send a message to Slack',
options={
'mcp_servers': {
'api-gateway': api_gateway_server,
},
'can_use_tool': lambda tool_call: (
# 只允许 GitHub 搜索工具
tool_call.get('name') == 'mcp__api-gateway__github_search_repos'
),
},
)
async for message in result:
print(message)
import asyncio
asyncio.run(main())错误处理
TypeScript - API 调用错误处理
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const apiServer = createSdkMcpServer('api-tools', {
tools: [
tool({
name: 'fetch_data',
description: 'Fetch data from an API',
schema: z.object({
endpoint: z.string().url().describe('API endpoint URL'),
}),
handler: async ({ endpoint }) => {
try {
const response = await fetch(endpoint);
if (!response.ok) {
return {
content: [{
type: 'text',
text: `API error: ${response.status} ${response.statusText}`,
}],
};
}
const data = await response.json();
return {
content: [{
type: 'text',
text: JSON.stringify(data, null, 2),
}],
};
} catch (error) {
return {
content: [{
type: 'text',
text: `Failed to fetch data: ${error instanceof Error ? error.message : String(error)}`,
}],
};
}
},
}),
],
});Python - API 调用错误处理
python
from codebuddy_agent_sdk import create_sdk_mcp_server, tool
from typing import Any
import aiohttp
import json
@tool(
"fetch_data",
"Fetch data from an API",
{"endpoint": str}
)
async def fetch_data(args: dict[str, Any]) -> dict[str, Any]:
try:
async with aiohttp.ClientSession() as session:
async with session.get(args['endpoint']) as response:
if response.status != 200:
return {
'content': [{
'type': 'text',
'text': f'API error: {response.status} {response.reason}'
}]
}
data = await response.json()
return {
'content': [{
'type': 'text',
'text': json.dumps(data, indent=2)
}]
}
except Exception as e:
return {
'content': [{
'type': 'text',
'text': f'Failed to fetch data: {str(e)}'
}]
}
# 创建 MCP 服务器并注册工具
server = create_sdk_mcp_server(
name='api-tools',
tools=[fetch_data]
)最佳实践
1. 使用明确的参数类型和描述
为工具参数提供清晰的类型和描述,帮助 Agent 理解如何调用工具:
typescript
tool({
name: 'process_data',
schema: z.object({
data: z.array(z.string()).describe('Data to process'),
format: z.enum(['json', 'csv']).describe('Output format'),
}),
handler: async ({ data, format }) => {
// 处理逻辑
},
})2. 提供有意义的错误反馈
始终返回明确的错误信息,以便 Agent 和用户理解发生了什么:
typescript
handler: async (input) => {
try {
// 执行操作
} catch (error) {
return {
content: [{
type: 'text',
text: `Operation failed: ${error instanceof Error ? error.message : String(error)}`,
}],
};
}
}3. 验证输入参数
确保输入符合预期的格式和范围:
typescript
handler: async ({ userId, email }) => {
if (!Number.isInteger(userId) || userId <= 0) {
return {
content: [{
type: 'text',
text: 'Error: User ID must be a positive integer',
}],
};
}
if (!email.includes('@')) {
return {
content: [{
type: 'text',
text: 'Error: Invalid email format',
}],
};
}
// 继续处理
}