Skip to content

在 AI 应用中接入 MCP

NOTE

本文档是实践教程,侧重于完整的应用集成流程。如需了解单个功能的配置方法(如身份验证、语义描述等),请参阅操作指南中的 MCP 集成 章节。

除了现有的 AI 编辑器,你也可以在自定义 AI 应用中通过 MCP 协议连接数据墙DBW,让应用以受控方式访问数据库。本教程演示如何在 Python 应用中通过 MCP 调用数据墙DBW 的数据操作工具。

场景

构建一个智能库存分析助手,能够回答用户关于产品库存的自然语言问题,并执行数据操作。AI 应用不直接写 SQL——通过 MCP 工具调用数据墙DBW,由引擎生成安全的数据库查询。

NOTE

MCP 功能要求数据墙DBW 1.7 或更高版本。Stdio 模式下传入请求大小限制为 1 MB。

架构

text
用户 → AI 应用(Python)
         ↓ MCP 协议
      数据墙DBW(Stdio 子进程)
         ↓ SQL
      数据库

应用以子进程方式启动数据墙DBW(Stdio 模式),通过标准输入/输出发送 MCP 协议消息。

第一步:准备数据墙DBW

连接本地 MCP 客户端,确保产品数据库和配置文件已就绪。关键配置:

  • runtime.mcp.enabled: true
  • 实体配置了 anonymous 角色的 read 权限
  • 字段已添加描述

第二步:安装 MCP SDK

以 Python 为例:

bash
pip install mcp

mcp 是 MCP 协议的官方 Python SDK,提供了客户端接口来连接 MCP 服务器。

第三步:连接数据墙DBW

python
import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

def create_server_params(role="anonymous"):
    """创建数据墙DBW Stdio 服务器参数"""
    return StdioServerParameters(
        command="dab",
        args=[
            "start",
            "--mcp-stdio",
            f"role:{role}",
            "--config", "./dab-config.json",
            "--LogLevel", "error"
        ]
    )

async def run_with_dab(role="anonymous"):
    """启动数据墙DBW 作为 Stdio 子进程并执行操作"""
    server_params = create_server_params(role)
    
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            # 在这里执行所有 MCP 操作
            yield session

使用示例:

python
async def main():
    # 只读操作使用 anonymous 角色
    async for session in run_with_dab(role="anonymous"):
        tools = await session.list_tools()
        for tool in tools.tools:
            print(f"  - {tool.name}: {tool.description}")

第四步:发现可用工具

python
async def list_available_tools(session):
    """列出数据墙DBW 暴露的全部 MCP 工具"""
    result = await session.list_tools()
    for tool in result.tools:
        print(f"  - {tool.name}: {tool.description}")
    return result.tools

输出示例:

  - describe_entities: 列出可用实体及其字段和操作权限
  - read_records: 从表或视图查询数据
  - create_record: 创建记录
  - update_record: 更新记录
  - delete_record: 删除记录
  - execute_entity: 执行存储过程
  - aggregate_records: 执行聚合统计查询

第五步:了解数据库结构

python
async def explore_database(session):
    """获取实体结构信息"""
    result = await session.call_tool("describe_entities", {})
    # describe_entities 返回 JSON 字符串,需要解析
    data = json.loads(result.content[0].text)
    
    for entity in data.get("entities", []):
        print(f"实体: {entity['name']}")
        print(f"  说明: {entity.get('description', '无')}")
        print(f"  可用操作: {', '.join(entity.get('operations', []))}")
        for field in entity.get("fields", []):
            print(f"  - {field['name']}: {field.get('description', '无说明')}")
        print()

利用 describe_entities 返回的 schema 信息,应用可以理解数据库中有哪些表、字段类型和业务含义。建议在应用启动时调用此工具,动态适配数据库结构变化。

第六步:查询数据

python
async def query_products(session, inventory_threshold=30):
    """查询库存低于指定阈值的产品"""
    result = await session.call_tool("read_records", {
        "entity": "Product",
        "filter": {
            "Inventory": {"lt": inventory_threshold}
        },
        "orderby": ["Inventory ASC"]
    })

    # read_records 返回 JSON 字符串
    products = json.loads(result.content[0].text)
    return products

第七步:计算聚合

python
async def get_inventory_stats(session):
    """获取库存统计(示例:计算平均价格)"""
    result = await session.call_tool("aggregate_records", {
        "entity": "Product",
        "function": "avg",
        "field": "Price"
    })

    # aggregate_records 返回 JSON 字符串
    stats = json.loads(result.content[0].text)
    return stats

第八步:组合为智能助手

将以上步骤组合成一个完整的交互式应用:

python
async def main():
    # 只读查询使用 anonymous 角色
    async for session in run_with_dab(role="anonymous"):
        # 用户问题
        question = "库存低于 30 的产品有哪些?它们的平均价格是多少?"

        # 1. 查询低库存产品
        products = await query_products(session, inventory_threshold=30)
        print(f"库存不足的产品: {products}")

        # 2. 统计所有产品的平均价格
        stats = await get_inventory_stats(session)
        print(f"库存统计: {stats}")

asyncio.run(main())

AI 应用的关键设计点

1. 不要自己写 SQL

应用通过 MCP 工具调用数据墙DBW,由引擎生成确定性的 SQL。不要在自己的代码中拼接 SQL 语句——这会绕过权限控制和安全边界。

2. 利用 describe_entities 动态理解 schema

应用启动时先调用 describe_entities 获取数据库结构。这样即使数据库增加了新表或新字段,应用也能自动适配,不需要硬编码表结构。

3. 角色权限分层

python
# 只读操作 → anonymous 角色
async for session in run_with_dab(role="anonymous"):
    products = await query_products(session)

# 写入操作 → authenticated 角色(需要在配置中授予相应权限)
async for session in run_with_dab(role="authenticated"):
    result = await session.call_tool("create_record", {
        "entity": "Product",
        "data": {"Name": "New Product", "Inventory": 100, "Price": 19.99}
    })

在应用中为不同的操作使用不同的角色连接,确保只读操作不会意外修改数据。角色名称必须与配置文件中实体 permissions 定义的角色一致。

4. 错误处理

python
try:
    result = await session.call_tool("read_records", {
        "entity": "Product",
        "filter": {"Inventory": {"lt": 30}}
    })
except Exception as e:
    # 可能是权限不足、实体不存在、数据库连接失败
    print(f"查询失败: {e}")

5. 连接复用

MCP 会话(Session)创建成本较高。在应用生命周期内保持一个会话,多个查询复用同一会话,而不是每次查询都创建新连接。

6. 限制与注意事项

  • Stdio 模式身份验证:Stdio 模式下自动使用 Simulator 身份验证,无需配置 JWT。角色通过 role:<role-name> 参数指定。
  • 请求大小限制:Stdio 模式下传入请求大小限制为 1 MB。
  • 不支持 JOINread_records 工具是为单表或单视图设计的,不支持 JOIN 操作。对于复杂查询,推荐使用视图或存储过程。
  • 字段元数据依赖配置describe_entities 返回的字段信息来自配置文件中的 fields 数据。如果未提供字段元数据,代理只能看到实体名,fields 数组会为空。

Node.js 示例

javascript
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

async function run() {
    const transport = new StdioClientTransport({
        command: "dab",
        args: [
            "start", "--mcp-stdio", "role:anonymous",
            "--config", "./dab-config.json", "--LogLevel", "error"
        ]
    });

    const client = new Client({ name: "inventory-assistant", version: "1.0.0" });
    
    try {
        await client.connect(transport);
        await client.initialize();

        // 调用工具
        const result = await client.callTool({
            name: "read_records",
            arguments: {
                entity: "Product",
                filter: { Inventory: { lt: 30 } }
            }
        });
        
        console.log("查询结果:", result);
    } catch (error) {
        console.error("MCP 调用失败:", error.message);
    } finally {
        await client.close();
    }
}

run();

下一步

数据墙DBW 产品文档与开发指南。