yunzer_go/server/mcp-server/test.py

175 lines
5.2 KiB
Python

#!/usr/bin/env python3
"""
MySQL MCP Server 测试脚本
"""
import subprocess
import json
import time
import sys
import os
from pathlib import Path
class MCPClient:
"""MCP 客户端"""
def __init__(self, binary_path):
"""初始化客户端"""
self.binary_path = binary_path
self.process = None
self.request_id = 0
def start(self):
"""启动 MCP 服务器"""
print("Starting MCP Server...")
self.process = subprocess.Popen(
[self.binary_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
time.sleep(1) # 等待服务器启动
print("✓ MCP Server started")
def stop(self):
"""停止服务器"""
if self.process:
self.process.terminate()
self.process.wait()
print("✓ MCP Server stopped")
def send_request(self, method, params=None):
"""发送请求"""
self.request_id += 1
request = {
"jsonrpc": "2.0",
"id": self.request_id,
"method": method,
"params": params or {}
}
json_str = json.dumps(request)
print(f"\n→ Sending: {method}")
print(f" Request: {json_str}")
self.process.stdin.write(json_str + "\n")
self.process.stdin.flush()
# 读取响应
response_str = self.process.stdout.readline()
print(f" Response: {response_str.strip()}")
try:
response = json.loads(response_str)
return response
except json.JSONDecodeError as e:
print(f" Error parsing response: {e}")
return None
def test_mcp_server():
"""测试 MCP 服务器"""
# 获取二进制文件路径
script_dir = Path(__file__).parent
binary_path = script_dir / "mcp-server.exe"
if not binary_path.exists():
print(f"Error: Binary not found at {binary_path}")
print("Please build the project first: go build -o mcp-server.exe main.go")
return False
client = MCPClient(str(binary_path))
try:
# 启动服务器
client.start()
# 测试 1: 初始化
print("\n" + "="*50)
print("Test 1: Initialize")
print("="*50)
response = client.send_request("initialize")
if response and "result" in response:
print("✓ Initialize successful")
else:
print("✗ Initialize failed")
return False
# 测试 2: 获取表列表
print("\n" + "="*50)
print("Test 2: Get Tables")
print("="*50)
response = client.send_request("get_tables")
if response and "result" in response:
tables = response["result"].get("tables", [])
print(f"✓ Get tables successful, found {len(tables)} tables")
if tables:
print(f" Tables: {', '.join(tables[:5])}")
else:
print("✗ Get tables failed")
return False
# 测试 3: 查询数据
print("\n" + "="*50)
print("Test 3: Query Data")
print("="*50)
response = client.send_request("query", {
"sql": "SELECT * FROM users LIMIT 5",
"args": []
})
if response and "result" in response:
count = response["result"].get("count", 0)
print(f"✓ Query successful, returned {count} rows")
if count > 0:
print(f" Sample row: {response['result']['rows'][0]}")
else:
print("✗ Query failed")
if "error" in response:
print(f" Error: {response['error']['message']}")
# 测试 4: 获取表结构
print("\n" + "="*50)
print("Test 4: Get Table Schema")
print("="*50)
response = client.send_request("get_table_schema", {
"table": "users"
})
if response and "result" in response:
schema = response["result"]
print(f"✓ Get schema successful, found {len(schema)} columns")
for col in schema[:3]:
print(f" - {col['field']}: {col['type']}")
else:
print("✗ Get schema failed")
# 测试 5: 参数化查询
print("\n" + "="*50)
print("Test 5: Parameterized Query")
print("="*50)
response = client.send_request("query", {
"sql": "SELECT * FROM users WHERE id = ?",
"args": [1]
})
if response and "result" in response:
count = response["result"].get("count", 0)
print(f"✓ Parameterized query successful, returned {count} rows")
else:
print("✗ Parameterized query failed")
print("\n" + "="*50)
print("All tests completed!")
print("="*50)
return True
except Exception as e:
print(f"Error: {e}")
return False
finally:
client.stop()
if __name__ == "__main__":
success = test_mcp_server()
sys.exit(0 if success else 1)