SDK Custom Tools Guide
Version Requirement: This documentation is for CodeBuddy Agent SDK v0.2.0 and above.
Feature Status: SDK Custom Tools is a Preview feature of the CodeBuddy Agent SDK.
This document explains how to create and use custom tools in the CodeBuddy Agent SDK. Custom tools allow you to define specialized functionality that Agents can invoke to complete specific tasks.
Overview
Custom Tools is a way provided by the CodeBuddy Agent SDK to create custom tools through MCP (Model Context Protocol). Unlike configuring external MCP servers, Custom Tools allows you to define tools directly within your application without requiring a separate process or server.
Core Advantages
- In-process Execution: Tools execute within the application, no need to create separate processes
- Type Safety: Full TypeScript type checking and type inference support
- Simplified Deployment: No need to deploy MCP servers separately, everything deploys with the application
- Tight Integration: Share memory and state with the application
- Zero Additional Dependencies: Leverages existing SDK infrastructure
Quick Start
TypeScript
Create a simple calculator tool:
typescript
import { query } from '@tencent-ai/agent-sdk';
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk/mcp';
import { z } from 'zod';
// Create MCP server and define tools
const calculatorServer = createSdkMcpServer('calculator', {
tools: [
tool({
name: 'add',
description: 'Add two numbers',
schema: z.object({
a: z.number().describe('First number'),
b: z.number().describe('Second number'),
}),
handler: async ({ a, b }) => {
return { result: a + b };
},
}),
tool({
name: 'multiply',
description: 'Multiply two numbers',
schema: z.object({
a: z.number().describe('First number'),
b: z.number().describe('Second number'),
}),
handler: async ({ a, b }) => {
return { result: a * b };
},
}),
],
});
// Use custom tools in SDK
const result = query({
prompt: 'Calculate 15 + 27 and then multiply the result by 3',
options: {
mcpServers: {
'calculator': calculatorServer,
},
},
});
for await (const message of result) {
console.log(message);
}Python
Python SDK uses decorator pattern for defining tools:
python
from codebuddy_agent_sdk import query, create_sdk_mcp_server, tool
from typing import Optional
# Create MCP server and define tools using decorators
calculator_server = create_sdk_mcp_server('calculator')
@calculator_server.tool()
def add(a: float, b: float) -> dict:
"""Add two numbers"""
return {'result': a + b}
@calculator_server.tool()
def multiply(a: float, b: float) -> dict:
"""Multiply two numbers"""
return {'result': a * b}
# Use custom tools in SDK
async def calculate():
result = query(
prompt='Calculate 15 + 27 and then multiply the result by 3',
options={
'mcp_servers': {
'calculator': calculator_server,
},
},
)
async for message in result:
print(message)
# Run
import asyncio
asyncio.run(calculate())Creating Custom Tools
TypeScript - Basic Tool Definition
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk/mcp';
import { z } from 'zod';
const myServer = createSdkMcpServer('my-tools', {
tools: [
tool({
name: 'my_tool',
description: 'Description of what the tool does',
schema: z.object({
parameter1: z.string().describe('Description of parameter1'),
parameter2: z.number().optional().describe('Optional parameter'),
}),
handler: async (input) => {
// Implement tool logic
return {
result: 'Tool output',
details: input,
};
},
}),
],
});TypeScript - Complete Example: File Analysis Tool
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk/mcp';
import { z } from 'zod';
import * as fs from 'fs/promises';
import * as path from 'path';
const fileAnalysisServer = createSdkMcpServer('file-analysis', {
tools: [
tool({
name: 'count_lines',
description: 'Count lines in a file',
schema: z.object({
filePath: z.string().describe('Path to the file'),
}),
handler: async ({ filePath }) => {
try {
const content = await fs.readFile(filePath, 'utf-8');
const lineCount = content.split('\n').length;
return {
success: true,
filePath,
lineCount,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'list_files',
description: 'List all files in a directory',
schema: z.object({
dirPath: z.string().describe('Path to the directory'),
pattern: z.string().optional().describe('Optional glob pattern'),
}),
handler: async ({ dirPath, pattern }) => {
try {
const files = await fs.readdir(dirPath);
let filtered = files;
if (pattern) {
const minimatch = require('minimatch').minimatch;
filtered = files.filter(f => minimatch(f, pattern));
}
return {
success: true,
dirPath,
files: filtered,
count: filtered.length,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'get_file_info',
description: 'Get information about a file',
schema: z.object({
filePath: z.string().describe('Path to the file'),
}),
handler: async ({ filePath }) => {
try {
const stats = await fs.stat(filePath);
return {
success: true,
filePath,
size: stats.size,
created: stats.birthtime,
modified: stats.mtime,
isDirectory: stats.isDirectory(),
isFile: stats.isFile(),
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
],
});
export default fileAnalysisServer;Python - Decorator Pattern
python
from codebuddy_agent_sdk import create_sdk_mcp_server
import os
from pathlib import Path
file_analysis_server = create_sdk_mcp_server('file-analysis')
@file_analysis_server.tool()
def count_lines(file_path: str) -> dict:
"""Count lines in a file"""
try:
with open(file_path, 'r') as f:
line_count = len(f.readlines())
return {
'success': True,
'file_path': file_path,
'line_count': line_count,
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@file_analysis_server.tool()
def list_files(dir_path: str, pattern: str = None) -> dict:
"""List all files in a directory"""
try:
files = os.listdir(dir_path)
if pattern:
import fnmatch
files = [f for f in files if fnmatch.fnmatch(f, pattern)]
return {
'success': True,
'dir_path': dir_path,
'files': files,
'count': len(files),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@file_analysis_server.tool()
def get_file_info(file_path: str) -> dict:
"""Get information about a file"""
try:
stat = os.stat(file_path)
return {
'success': True,
'file_path': file_path,
'size': stat.st_size,
'created': stat.st_ctime,
'modified': stat.st_mtime,
'is_file': os.path.isfile(file_path),
'is_dir': os.path.isdir(file_path),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}Managing Multiple Tools
TypeScript
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk/mcp';
import { z } from 'zod';
const multiToolServer = createSdkMcpServer('multi-tools', {
tools: [
tool({
name: 'tool_one',
description: 'First tool',
schema: z.object({ input: z.string() }),
handler: async ({ input }) => ({ result: `Tool 1: ${input}` }),
}),
tool({
name: 'tool_two',
description: 'Second tool',
schema: z.object({ data: z.number() }),
handler: async ({ data }) => ({ result: `Tool 2: ${data * 2}` }),
}),
tool({
name: 'tool_three',
description: 'Third tool',
schema: z.object({
name: z.string(),
age: z.number().optional(),
}),
handler: async ({ name, age }) => ({
result: `Tool 3: ${name}, age ${age ?? 'unknown'}`,
}),
}),
],
});
// Use in SDK
const result = query({
prompt: 'Use all the available tools',
options: {
mcpServers: {
'multi-tools': multiToolServer,
},
},
});Python
python
from codebuddy_agent_sdk import create_sdk_mcp_server
server = create_sdk_mcp_server('multi-tools')
@server.tool()
def tool_one(input: str) -> dict:
"""First tool"""
return {'result': f'Tool 1: {input}'}
@server.tool()
def tool_two(data: int) -> dict:
"""Second tool"""
return {'result': f'Tool 2: {data * 2}'}
@server.tool()
def tool_three(name: str, age: int = None) -> dict:
"""Third tool"""
age_str = age if age else 'unknown'
return {'result': f'Tool 3: {name}, age {age_str}'}Type Safety
TypeScript - Using Zod Schema
Zod provides runtime type validation and powerful type inference:
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk/mcp';
import { z } from 'zod';
const dataProcessingServer = createSdkMcpServer('data-processing', {
tools: [
tool({
name: 'process_user_data',
description: 'Process and validate user data',
schema: z.object({
userId: z.number().int().positive().describe('User ID'),
email: z.string().email().describe('User email'),
tags: z.array(z.string()).describe('User tags'),
preferences: z.object({
notifications: z.boolean().default(true),
theme: z.enum(['light', 'dark', 'auto']).default('auto'),
}).optional(),
}),
handler: async (input) => {
// input type is fully inferred from Zod schema
// TypeScript knows the types of all fields
const result = {
userId: input.userId,
email: input.email,
tagCount: input.tags.length,
hasPreferences: !!input.preferences,
};
return result;
},
}),
],
});Python - Type Annotations
Python SDK uses standard type annotations:
python
from codebuddy_agent_sdk import create_sdk_mcp_server
from typing import Optional, List, Dict, Any
from enum import Enum
class Theme(str, Enum):
LIGHT = 'light'
DARK = 'dark'
AUTO = 'auto'
server = create_sdk_mcp_server('data-processing')
@server.tool()
def process_user_data(
user_id: int,
email: str,
tags: List[str],
notifications: bool = True,
theme: Theme = Theme.AUTO,
) -> Dict[str, Any]:
"""Process and validate user data"""
return {
'user_id': user_id,
'email': email,
'tag_count': len(tags),
'theme': theme.value,
'notifications': notifications,
}Complete Example: Database Query Tool
TypeScript
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk/mcp';
import { z } from 'zod';
interface QueryResult {
rows: Record<string, any>[];
rowCount: number;
}
interface Database {
query(sql: string, params?: any[]): Promise<QueryResult>;
}
// Assume you already have a database connection
const db: Database = new Database();
const databaseServer = createSdkMcpServer('database', {
tools: [
tool({
name: 'execute_query',
description: 'Execute a read-only SQL query',
schema: z.object({
sql: z.string().describe('SQL query to execute'),
params: z.array(z.any()).optional().describe('Query parameters'),
}),
handler: async ({ sql, params }) => {
try {
// Prevent dangerous operations
const upperSql = sql.toUpperCase();
if (
upperSql.includes('DROP') ||
upperSql.includes('DELETE') ||
upperSql.includes('UPDATE') ||
upperSql.includes('INSERT')
) {
return {
success: false,
error: 'Only SELECT queries are allowed',
};
}
const result = await db.query(sql, params);
return {
success: true,
rows: result.rows,
rowCount: result.rowCount,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Query execution failed',
};
}
},
}),
tool({
name: 'get_table_schema',
description: 'Get the schema of a table',
schema: z.object({
tableName: z.string().describe('Name of the table'),
}),
handler: async ({ tableName }) => {
try {
const result = await db.query(
`SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1`,
[tableName]
);
return {
success: true,
tableName,
columns: result.rows,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Schema retrieval failed',
};
}
},
}),
],
});Python
python
from codebuddy_agent_sdk import create_sdk_mcp_server
from typing import List, Dict, Any, Optional
server = create_sdk_mcp_server('database')
class Database:
"""Simplified database wrapper"""
async def query(self, sql: str, params: List[Any] = None) -> Dict[str, Any]:
# Implement actual database query
pass
db = Database()
@server.tool()
async def execute_query(
sql: str,
params: List[Any] = None,
) -> Dict[str, Any]:
"""Execute a read-only SQL query"""
try:
# Prevent dangerous operations
dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT']
if any(keyword in sql.upper() for keyword in dangerous_keywords):
return {
'success': False,
'error': 'Only SELECT queries are allowed',
}
result = await db.query(sql, params)
return {
'success': True,
'rows': result.get('rows', []),
'row_count': result.get('row_count', 0),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@server.tool()
async def get_table_schema(table_name: str) -> Dict[str, Any]:
"""Get the schema of a table"""
try:
result = await db.query(
'SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s',
[table_name]
)
return {
'success': True,
'table_name': table_name,
'columns': result.get('rows', []),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}Complete Example: API Integration Tool
TypeScript
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk/mcp';
import { z } from 'zod';
const apiGatewayServer = createSdkMcpServer('api-gateway', {
tools: [
tool({
name: 'stripe_create_payment',
description: 'Create a payment through Stripe',
schema: z.object({
amount: z.number().positive().describe('Amount in cents'),
currency: z.string().default('usd').describe('Currency code'),
description: z.string().optional().describe('Payment description'),
}),
handler: async ({ amount, currency, description }) => {
try {
const response = await fetch('https://api.stripe.com/v1/payment_intents', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.STRIPE_API_KEY}`,
'Content-Type': 'application/x-www-form-urlencoded',
},
body: new URLSearchParams({
amount: amount.toString(),
currency,
...(description && { description }),
}),
});
if (!response.ok) {
const error = await response.json();
return {
success: false,
error: error.error?.message || 'Payment creation failed',
};
}
const data = await response.json();
return {
success: true,
paymentId: data.id,
status: data.status,
clientSecret: data.client_secret,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'github_search_repos',
description: 'Search repositories on GitHub',
schema: z.object({
query: z.string().describe('Search query'),
language: z.string().optional().describe('Programming language'),
sort: z.enum(['stars', 'forks', 'updated']).default('stars'),
}),
handler: async ({ query, language, sort }) => {
try {
const searchQuery = language
? `${query} language:${language}`
: query;
const response = await fetch(
`https://api.github.com/search/repositories?q=${encodeURIComponent(
searchQuery
)}&sort=${sort}`,
{
headers: {
'Authorization': `Bearer ${process.env.GITHUB_TOKEN}`,
},
}
);
if (!response.ok) {
return {
success: false,
error: `GitHub API error: ${response.status}`,
};
}
const data = await response.json();
return {
success: true,
repos: data.items.map((repo: any) => ({
name: repo.name,
url: repo.html_url,
stars: repo.stargazers_count,
language: repo.language,
description: repo.description,
})),
total: data.total_count,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'slack_send_message',
description: 'Send a message to a Slack channel',
schema: z.object({
channel: z.string().describe('Channel ID or name'),
text: z.string().describe('Message text'),
thread_ts: z.string().optional().describe('Thread timestamp (for replies)'),
}),
handler: async ({ channel, text, thread_ts }) => {
try {
const payload: Record<string, any> = {
channel,
text,
};
if (thread_ts) {
payload.thread_ts = thread_ts;
}
const response = await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.SLACK_BOT_TOKEN}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
});
const data = await response.json();
if (!data.ok) {
return {
success: false,
error: data.error || 'Failed to send message',
};
}
return {
success: true,
messageTs: data.ts,
channel: data.channel,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
],
});
export default apiGatewayServer;Python
python
from codebuddy_agent_sdk import create_sdk_mcp_server
from typing import Optional, List, Dict, Any
import requests
import os
server = create_sdk_mcp_server('api-gateway')
@server.tool()
def stripe_create_payment(
amount: float,
currency: str = 'usd',
description: Optional[str] = None,
) -> Dict[str, Any]:
"""Create a payment through Stripe"""
try:
headers = {
'Authorization': f"Bearer {os.environ.get('STRIPE_API_KEY')}",
}
data = {
'amount': int(amount),
'currency': currency,
}
if description:
data['description'] = description
response = requests.post(
'https://api.stripe.com/v1/payment_intents',
headers=headers,
data=data,
)
if response.status_code >= 400:
error = response.json().get('error', {})
return {
'success': False,
'error': error.get('message', 'Payment creation failed'),
}
data = response.json()
return {
'success': True,
'payment_id': data['id'],
'status': data['status'],
'client_secret': data.get('client_secret'),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@server.tool()
def github_search_repos(
query: str,
language: Optional[str] = None,
sort: str = 'stars',
) -> Dict[str, Any]:
"""Search repositories on GitHub"""
try:
search_query = f"{query} language:{language}" if language else query
response = requests.get(
'https://api.github.com/search/repositories',
params={
'q': search_query,
'sort': sort,
},
headers={
'Authorization': f"Bearer {os.environ.get('GITHUB_TOKEN')}",
},
)
if response.status_code >= 400:
return {
'success': False,
'error': f"GitHub API error: {response.status_code}",
}
data = response.json()
repos = [
{
'name': repo['name'],
'url': repo['html_url'],
'stars': repo['stargazers_count'],
'language': repo['language'],
'description': repo['description'],
}
for repo in data.get('items', [])
]
return {
'success': True,
'repos': repos,
'total': data.get('total_count', 0),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@server.tool()
def slack_send_message(
channel: str,
text: str,
thread_ts: Optional[str] = None,
) -> Dict[str, Any]:
"""Send a message to a Slack channel"""
try:
payload = {
'channel': channel,
'text': text,
}
if thread_ts:
payload['thread_ts'] = thread_ts
response = requests.post(
'https://slack.com/api/chat.postMessage',
headers={
'Authorization': f"Bearer {os.environ.get('SLACK_BOT_TOKEN')}",
'Content-Type': 'application/json',
},
json=payload,
)
data = response.json()
if not data.get('ok'):
return {
'success': False,
'error': data.get('error', 'Failed to send message'),
}
return {
'success': True,
'message_ts': data['ts'],
'channel': data['channel'],
}
except Exception as e:
return {
'success': False,
'error': str(e),
}Selectively Allowing Tools
You can selectively allow specific tools to be called:
TypeScript
typescript
import { query } from '@tencent-ai/agent-sdk';
import apiGatewayServer from './api-gateway-server';
const result = query({
prompt: 'Search for popular repositories and send a message to Slack',
options: {
mcpServers: {
'api-gateway': apiGatewayServer,
},
canUseTool: (toolCall) => {
// Only allow GitHub search tool
const allowedTools = [
'mcp__api-gateway__github_search_repos',
];
if (!allowedTools.includes(toolCall.name)) {
return false;
}
return true;
},
},
});Python
python
from codebuddy_agent_sdk import query
from api_gateway_server import server as api_gateway_server
result = query(
prompt='Search for popular repositories and send a message to Slack',
options={
'mcp_servers': {
'api-gateway': api_gateway_server,
},
'can_use_tool': lambda toolCall: (
# Only allow GitHub search tool
toolCall.get('name') == 'mcp__api-gateway__github_search_repos'
),
},
)Error Handling
TypeScript - API Call Error Handling
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk/mcp';
import { z } from 'zod';
const apiServer = createSdkMcpServer('api-tools', {
tools: [
tool({
name: 'fetch_data',
description: 'Fetch data from an API',
schema: z.object({
endpoint: z.string().url().describe('API endpoint URL'),
}),
handler: async ({ endpoint }) => {
try {
const response = await fetch(endpoint);
if (!response.ok) {
return {
content: [{
type: 'text',
text: `API error: ${response.status} ${response.statusText}`,
}],
};
}
const data = await response.json();
return {
content: [{
type: 'text',
text: JSON.stringify(data, null, 2),
}],
};
} catch (error) {
return {
content: [{
type: 'text',
text: `Failed to fetch data: ${error instanceof Error ? error.message : String(error)}`,
}],
};
}
},
}),
],
});Python - API Call Error Handling
python
from codebuddy_agent_sdk import create_sdk_mcp_server
import aiohttp
import json
from typing import Any
server = create_sdk_mcp_server('api-tools')
@server.tool()
async def fetch_data(endpoint: str) -> dict[str, Any]:
"""Fetch data from an API"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(endpoint) as response:
if response.status != 200:
return {
'content': [{
'type': 'text',
'text': f'API error: {response.status} {response.reason}'
}]
}
data = await response.json()
return {
'content': [{
'type': 'text',
'text': json.dumps(data, indent=2)
}]
}
except Exception as e:
return {
'content': [{
'type': 'text',
'text': f'Failed to fetch data: {str(e)}'
}]
}Best Practices
1. Use Clear Parameter Types and Descriptions
Provide clear types and descriptions for tool parameters to help of the Agent understand how to call tools:
typescript
tool({
name: 'process_data',
schema: z.object({
data: z.array(z.string()).describe('Data to process'),
format: z.enum(['json', 'csv']).describe('Output format'),
}),
handler: async ({ data, format }) => {
// Processing logic
},
})2. Provide Meaningful Error Feedback
Always return clear error messages so that Agent and users understand what happened:
typescript
handler: async (input) => {
try {
// Execute operation
} catch (error) {
return {
content: [{
type: 'text',
text: `Operation failed: ${error instanceof Error ? error.message : String(error)}`,
}],
};
}
}3. Validate Input Parameters
Ensure inputs conform to expected formats and ranges:
typescript
handler: async ({ userId, email }) => {
if (!Number.isInteger(userId) || userId <= 0) {
return {
content: [{
type: 'text',
text: 'Error: User ID must be a positive integer',
}],
};
}
if (!email.includes('@')) {
return {
content: [{
type: 'text',
text: 'Error: Invalid email format',
}],
};
}
// Continue processing
}