#!/usr/bin/env python3 """ MySQL MCP Server 测试脚本 """ import subprocess import json import time import sys import os from pathlib import Path class MCPClient: """MCP 客户端""" def __init__(self, binary_path): """初始化客户端""" self.binary_path = binary_path self.process = None self.request_id = 0 def start(self): """启动 MCP 服务器""" print("Starting MCP Server...") self.process = subprocess.Popen( [self.binary_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) time.sleep(1) # 等待服务器启动 print("✓ MCP Server started") def stop(self): """停止服务器""" if self.process: self.process.terminate() self.process.wait() print("✓ MCP Server stopped") def send_request(self, method, params=None): """发送请求""" self.request_id += 1 request = { "jsonrpc": "2.0", "id": self.request_id, "method": method, "params": params or {} } json_str = json.dumps(request) print(f"\n→ Sending: {method}") print(f" Request: {json_str}") self.process.stdin.write(json_str + "\n") self.process.stdin.flush() # 读取响应 response_str = self.process.stdout.readline() print(f" Response: {response_str.strip()}") try: response = json.loads(response_str) return response except json.JSONDecodeError as e: print(f" Error parsing response: {e}") return None def test_mcp_server(): """测试 MCP 服务器""" # 获取二进制文件路径 script_dir = Path(__file__).parent binary_path = script_dir / "mcp-server.exe" if not binary_path.exists(): print(f"Error: Binary not found at {binary_path}") print("Please build the project first: go build -o mcp-server.exe main.go") return False client = MCPClient(str(binary_path)) try: # 启动服务器 client.start() # 测试 1: 初始化 print("\n" + "="*50) print("Test 1: Initialize") print("="*50) response = client.send_request("initialize") if response and "result" in response: print("✓ Initialize successful") else: print("✗ Initialize failed") return False # 测试 2: 获取表列表 print("\n" + "="*50) print("Test 2: Get Tables") print("="*50) response = client.send_request("get_tables") if response and "result" in response: tables = response["result"].get("tables", []) print(f"✓ Get tables successful, found {len(tables)} tables") if tables: print(f" Tables: {', '.join(tables[:5])}") else: print("✗ Get tables failed") return False # 测试 3: 查询数据 print("\n" + "="*50) print("Test 3: Query Data") print("="*50) response = client.send_request("query", { "sql": "SELECT * FROM users LIMIT 5", "args": [] }) if response and "result" in response: count = response["result"].get("count", 0) print(f"✓ Query successful, returned {count} rows") if count > 0: print(f" Sample row: {response['result']['rows'][0]}") else: print("✗ Query failed") if "error" in response: print(f" Error: {response['error']['message']}") # 测试 4: 获取表结构 print("\n" + "="*50) print("Test 4: Get Table Schema") print("="*50) response = client.send_request("get_table_schema", { "table": "users" }) if response and "result" in response: schema = response["result"] print(f"✓ Get schema successful, found {len(schema)} columns") for col in schema[:3]: print(f" - {col['field']}: {col['type']}") else: print("✗ Get schema failed") # 测试 5: 参数化查询 print("\n" + "="*50) print("Test 5: Parameterized Query") print("="*50) response = client.send_request("query", { "sql": "SELECT * FROM users WHERE id = ?", "args": [1] }) if response and "result" in response: count = response["result"].get("count", 0) print(f"✓ Parameterized query successful, returned {count} rows") else: print("✗ Parameterized query failed") print("\n" + "="*50) print("All tests completed!") print("="*50) return True except Exception as e: print(f"Error: {e}") return False finally: client.stop() if __name__ == "__main__": success = test_mcp_server() sys.exit(0 if success else 1)