| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- MCP Stdio 工具 - 修复编码问题
- 确保所有输出都使用UTF-8编码
- """
- import json
- import sys
- import os
- from typing import Dict, Any
- import logging
- # 强制使用UTF-8编码
- if sys.platform == "win32":
- # Windows需要特殊处理
- import io
- sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
- sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
- else:
- # Unix-like系统
- sys.stdin.reconfigure(encoding='utf-8')
- sys.stdout.reconfigure(encoding='utf-8')
- sys.stderr.reconfigure(encoding='utf-8')
- # 设置环境变量
- os.environ['PYTHONIOENCODING'] = 'utf-8'
- os.environ['PYTHONUTF8'] = '1'
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- encoding='utf-8'
- )
- logger = logging.getLogger("mcp-tool")
- class FixedMCPServer:
- """修复编码问题的MCP服务器"""
- def __init__(self):
- self.tools = {}
- self.initialize_tools()
- def initialize_tools(self):
- """初始化工具集"""
- # 获取时间
- self.tools["get_time"] = {
- "name": "get_time",
- "description": "获取当前时间",
- "inputSchema": {
- "type": "object",
- "properties": {
- "format": {
- "type": "string",
- "description": "时间格式",
- "enum": ["iso", "timestamp", "human", "chinese"],
- "default": "iso"
- }
- }
- }
- }
- # 文本处理工具
- self.tools["text_process"] = {
- "name": "text_process",
- "description": "文本处理工具",
- "inputSchema": {
- "type": "object",
- "properties": {
- "text": {
- "type": "string",
- "description": "输入文本"
- },
- "operation": {
- "type": "string",
- "description": "操作类型",
- "enum": ["length", "upper", "lower", "reverse", "count_words"],
- "default": "length"
- }
- },
- "required": ["text"]
- }
- }
- # 数据格式工具
- self.tools["format_data"] = {
- "name": "format_data",
- "description": "格式化数据",
- "inputSchema": {
- "type": "object",
- "properties": {
- "data": {
- "type": "string",
- "description": "原始数据"
- },
- "format": {
- "type": "string",
- "description": "格式类型",
- "enum": ["json", "yaml", "xml"],
- "default": "json"
- }
- },
- "required": ["data"]
- }
- }
- def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
- """处理请求"""
- try:
- method = request.get("method")
- params = request.get("params", {})
- if method == "tools/list":
- return self.handle_tools_list()
- elif method == "tools/call":
- return self.handle_tool_call(params)
- elif method == "ping":
- return {"result": "pong"}
- else:
- return self.create_error_response(
- code=-32601,
- message="Method not found"
- )
- except Exception as e:
- logger.error(f"Error handling request: {e}")
- return self.create_error_response(
- code=-32603,
- message=f"Internal error: {str(e)}"
- )
- def handle_tools_list(self) -> Dict[str, Any]:
- """列出所有工具 - 确保返回标准JSON"""
- return {
- "result": {
- "tools": list(self.tools.values())
- }
- }
- def handle_tool_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
- """调用工具 - 修复响应格式"""
- name = params.get("name")
- arguments = params.get("arguments", {})
- if name not in self.tools:
- return self.create_error_response(
- code=-32602,
- message=f"Tool '{name}' not found"
- )
- try:
- if name == "get_time":
- result = self.execute_get_time(arguments)
- elif name == "text_process":
- result = self.execute_text_process(arguments)
- elif name == "format_data":
- result = self.execute_format_data(arguments)
- else:
- return self.create_error_response(
- code=-32602,
- message="Tool not implemented"
- )
- # 确保返回正确的MCP响应格式
- return self.create_success_response(result)
- except Exception as e:
- logger.error(f"Tool execution error: {e}")
- return self.create_error_response(
- code=-32603,
- message=f"Tool execution failed: {str(e)}"
- )
- def execute_get_time(self, args: Dict[str, Any]) -> Dict[str, Any]:
- """获取时间 - 支持中文"""
- from datetime import datetime
- try:
- format_type = args.get("format", "iso")
- now = datetime.now()
- if format_type == "iso":
- result = now.isoformat()
- elif format_type == "timestamp":
- result = now.timestamp()
- elif format_type == "human":
- result = now.strftime("%Y-%m-%d %H:%M:%S")
- elif format_type == "chinese":
- result = now.strftime("%Y年%m月%d日 %H时%M分%S秒")
- else:
- result = now.isoformat()
- logger.info(f"当前系统时间:{result}")
- return {
- "status": "success",
- "format": format_type,
- "time": result,
- "timestamp": now.timestamp(),
- "date": now.strftime("%Y-%m-%d"),
- "time_12h": now.strftime("%I:%M:%S %p")
- }
- except Exception as e:
- return {
- "status": "error",
- "error": str(e)
- }
- def execute_text_process(self, args: Dict[str, Any]) -> Dict[str, Any]:
- """文本处理"""
- try:
- text = args.get("text", "")
- operation = args.get("operation", "length")
- if operation == "length":
- result = len(text)
- result_str = f"文本长度: {result} 个字符"
- elif operation == "upper":
- result = text.upper()
- result_str = f"大写: {result}"
- elif operation == "lower":
- result = text.lower()
- result_str = f"小写: {result}"
- elif operation == "reverse":
- result = text[::-1]
- result_str = f"反转: {result}"
- elif operation == "count_words":
- words = len(text.split())
- result = words
- result_str = f"单词数: {words}"
- else:
- raise ValueError(f"未知操作: {operation}")
- return {
- "status": "success",
- "operation": operation,
- "original_text": text,
- "result": result,
- "result_str": result_str,
- "text_length": len(text)
- }
- except Exception as e:
- return {
- "status": "error",
- "error": str(e),
- "operation": args.get("operation", "")
- }
- def execute_format_data(self, args: Dict[str, Any]) -> Dict[str, Any]:
- """格式化数据"""
- try:
- data_str = args.get("data", "")
- format_type = args.get("format", "json")
- # 尝试解析为JSON
- try:
- data = json.loads(data_str)
- is_json = True
- except:
- data = data_str
- is_json = False
- if format_type == "json":
- if is_json:
- result = json.dumps(data, ensure_ascii=False, indent=2)
- else:
- # 如果不是JSON,包装成JSON
- result = json.dumps({"text": data}, ensure_ascii=False, indent=2)
- elif format_type == "yaml":
- import yaml
- result = yaml.dump(data, allow_unicode=True, default_flow_style=False)
- elif format_type == "xml":
- # 简单的XML格式化
- if isinstance(data, dict):
- result = "<data>"
- for k, v in data.items():
- result += f"\n <{k}>{v}</{k}>"
- result += "\n</data>"
- else:
- result = f"<text>{data}</text>"
- else:
- result = str(data)
- return {
- "status": "success",
- "format": format_type,
- "original": data_str,
- "formatted": result,
- "length": len(result)
- }
- except Exception as e:
- return {
- "status": "error",
- "error": str(e),
- "format": args.get("format", "")
- }
- def create_success_response(self, data: Dict[str, Any]) -> Dict[str, Any]:
- """创建成功响应 - 确保符合MCP规范"""
- # 将数据转换为JSON字符串作为文本内容
- content_text = json.dumps(data, ensure_ascii=False, indent=2)
- return {
- "result": {
- "content": [
- {
- "type": "text",
- "text": content_text
- }
- ],
- "isError": False
- }
- }
- def create_error_response(self, code: int, message: str) -> Dict[str, Any]:
- """创建错误响应"""
- return {
- "error": {
- "code": code,
- "message": message
- }
- }
- def safe_json_dump(data: Dict[str, Any]) -> str:
- """安全的JSON序列化,确保UTF-8编码"""
- try:
- return json.dumps(data, ensure_ascii=False, separators=(',', ':'))
- except:
- # 如果失败,使用ASCII转义
- return json.dumps(data, ensure_ascii=True, separators=(',', ':'))
- def main():
- """主函数 - 修复Stdio通信"""
- logger.info("启动MCP Stdio服务器 (修复编码版)...")
- server = FixedMCPServer()
- # 初始握手消息
- init_message = {
- "jsonrpc": "2.0",
- "id": 1,
- "result": {
- "protocolVersion": "2024-11-05",
- "capabilities": {
- "tools": {}
- },
- "serverInfo": {
- "name": "fixed-mcp-server",
- "version": "1.0.0"
- }
- }
- }
- # 发送初始化响应
- try:
- sys.stdout.write(safe_json_dump(init_message) + "\n")
- sys.stdout.flush()
- except Exception as e:
- logger.error(f"发送初始化消息失败: {e}")
- return
- logger.info("MCP服务器已初始化")
- # 主循环
- line_num = 0
- while True:
- try:
- line = sys.stdin.readline()
- if not line:
- logger.info("输入流结束")
- break
- line = line.strip()
- line_num += 1
- if not line:
- continue
- logger.info(f"收到第 {line_num} 行: {line[:100]}...")
- try:
- request = json.loads(line)
- logger.info(f"解析请求: {request.get('method', 'unknown')}")
- # 处理请求
- response = server.handle_request(request)
- response["jsonrpc"] = "2.0"
- response["id"] = request.get("id")
- # 发送响应
- response_json = safe_json_dump(response)
- sys.stdout.write(response_json + "\n")
- sys.stdout.flush()
- logger.info(f"发送响应: {response.get('result', response.get('error', {}))}")
- except json.JSONDecodeError as e:
- logger.error(f"JSON解析错误: {e}")
- error_response = {
- "jsonrpc": "2.0",
- "error": {
- "code": -32700,
- "message": f"Parse error at line {line_num}"
- },
- "id": None
- }
- sys.stdout.write(safe_json_dump(error_response) + "\n")
- sys.stdout.flush()
- except KeyboardInterrupt:
- logger.info("接收到中断信号")
- break
- except Exception as e:
- logger.error(f"未处理的错误: {e}")
- break
- logger.info("MCP服务器已停止")
- if __name__ == "__main__":
- main()
|