175 lines
5.2 KiB
Python
175 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MySQL MCP Server 测试脚本
|
|
"""
|
|
|
|
import subprocess
|
|
import json
|
|
import time
|
|
import sys
|
|
import os
|
|
from pathlib import Path
|
|
|
|
class MCPClient:
|
|
"""MCP 客户端"""
|
|
|
|
def __init__(self, binary_path):
|
|
"""初始化客户端"""
|
|
self.binary_path = binary_path
|
|
self.process = None
|
|
self.request_id = 0
|
|
|
|
def start(self):
|
|
"""启动 MCP 服务器"""
|
|
print("Starting MCP Server...")
|
|
self.process = subprocess.Popen(
|
|
[self.binary_path],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1
|
|
)
|
|
time.sleep(1) # 等待服务器启动
|
|
print("✓ MCP Server started")
|
|
|
|
def stop(self):
|
|
"""停止服务器"""
|
|
if self.process:
|
|
self.process.terminate()
|
|
self.process.wait()
|
|
print("✓ MCP Server stopped")
|
|
|
|
def send_request(self, method, params=None):
|
|
"""发送请求"""
|
|
self.request_id += 1
|
|
request = {
|
|
"jsonrpc": "2.0",
|
|
"id": self.request_id,
|
|
"method": method,
|
|
"params": params or {}
|
|
}
|
|
|
|
json_str = json.dumps(request)
|
|
print(f"\n→ Sending: {method}")
|
|
print(f" Request: {json_str}")
|
|
|
|
self.process.stdin.write(json_str + "\n")
|
|
self.process.stdin.flush()
|
|
|
|
# 读取响应
|
|
response_str = self.process.stdout.readline()
|
|
print(f" Response: {response_str.strip()}")
|
|
|
|
try:
|
|
response = json.loads(response_str)
|
|
return response
|
|
except json.JSONDecodeError as e:
|
|
print(f" Error parsing response: {e}")
|
|
return None
|
|
|
|
def test_mcp_server():
|
|
"""测试 MCP 服务器"""
|
|
|
|
# 获取二进制文件路径
|
|
script_dir = Path(__file__).parent
|
|
binary_path = script_dir / "mcp-server.exe"
|
|
|
|
if not binary_path.exists():
|
|
print(f"Error: Binary not found at {binary_path}")
|
|
print("Please build the project first: go build -o mcp-server.exe main.go")
|
|
return False
|
|
|
|
client = MCPClient(str(binary_path))
|
|
|
|
try:
|
|
# 启动服务器
|
|
client.start()
|
|
|
|
# 测试 1: 初始化
|
|
print("\n" + "="*50)
|
|
print("Test 1: Initialize")
|
|
print("="*50)
|
|
response = client.send_request("initialize")
|
|
if response and "result" in response:
|
|
print("✓ Initialize successful")
|
|
else:
|
|
print("✗ Initialize failed")
|
|
return False
|
|
|
|
# 测试 2: 获取表列表
|
|
print("\n" + "="*50)
|
|
print("Test 2: Get Tables")
|
|
print("="*50)
|
|
response = client.send_request("get_tables")
|
|
if response and "result" in response:
|
|
tables = response["result"].get("tables", [])
|
|
print(f"✓ Get tables successful, found {len(tables)} tables")
|
|
if tables:
|
|
print(f" Tables: {', '.join(tables[:5])}")
|
|
else:
|
|
print("✗ Get tables failed")
|
|
return False
|
|
|
|
# 测试 3: 查询数据
|
|
print("\n" + "="*50)
|
|
print("Test 3: Query Data")
|
|
print("="*50)
|
|
response = client.send_request("query", {
|
|
"sql": "SELECT * FROM users LIMIT 5",
|
|
"args": []
|
|
})
|
|
if response and "result" in response:
|
|
count = response["result"].get("count", 0)
|
|
print(f"✓ Query successful, returned {count} rows")
|
|
if count > 0:
|
|
print(f" Sample row: {response['result']['rows'][0]}")
|
|
else:
|
|
print("✗ Query failed")
|
|
if "error" in response:
|
|
print(f" Error: {response['error']['message']}")
|
|
|
|
# 测试 4: 获取表结构
|
|
print("\n" + "="*50)
|
|
print("Test 4: Get Table Schema")
|
|
print("="*50)
|
|
response = client.send_request("get_table_schema", {
|
|
"table": "users"
|
|
})
|
|
if response and "result" in response:
|
|
schema = response["result"]
|
|
print(f"✓ Get schema successful, found {len(schema)} columns")
|
|
for col in schema[:3]:
|
|
print(f" - {col['field']}: {col['type']}")
|
|
else:
|
|
print("✗ Get schema failed")
|
|
|
|
# 测试 5: 参数化查询
|
|
print("\n" + "="*50)
|
|
print("Test 5: Parameterized Query")
|
|
print("="*50)
|
|
response = client.send_request("query", {
|
|
"sql": "SELECT * FROM users WHERE id = ?",
|
|
"args": [1]
|
|
})
|
|
if response and "result" in response:
|
|
count = response["result"].get("count", 0)
|
|
print(f"✓ Parameterized query successful, returned {count} rows")
|
|
else:
|
|
print("✗ Parameterized query failed")
|
|
|
|
print("\n" + "="*50)
|
|
print("All tests completed!")
|
|
print("="*50)
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return False
|
|
finally:
|
|
client.stop()
|
|
|
|
if __name__ == "__main__":
|
|
success = test_mcp_server()
|
|
sys.exit(0 if success else 1)
|