main.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. MCP Stdio 工具 - 修复编码问题
  5. 确保所有输出都使用UTF-8编码
  6. """
  7. import json
  8. import sys
  9. import os
  10. from typing import Dict, Any
  11. import logging
  12. # 强制使用UTF-8编码
  13. if sys.platform == "win32":
  14. # Windows需要特殊处理
  15. import io
  16. sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
  17. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
  18. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
  19. else:
  20. # Unix-like系统
  21. sys.stdin.reconfigure(encoding='utf-8')
  22. sys.stdout.reconfigure(encoding='utf-8')
  23. sys.stderr.reconfigure(encoding='utf-8')
  24. # 设置环境变量
  25. os.environ['PYTHONIOENCODING'] = 'utf-8'
  26. os.environ['PYTHONUTF8'] = '1'
  27. # 配置日志
  28. logging.basicConfig(
  29. level=logging.INFO,
  30. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  31. encoding='utf-8'
  32. )
  33. logger = logging.getLogger("mcp-tool")
  34. class FixedMCPServer:
  35. """修复编码问题的MCP服务器"""
  36. def __init__(self):
  37. self.tools = {}
  38. self.initialize_tools()
  39. def initialize_tools(self):
  40. """初始化工具集"""
  41. # 获取时间
  42. self.tools["get_time"] = {
  43. "name": "get_time",
  44. "description": "获取当前时间",
  45. "inputSchema": {
  46. "type": "object",
  47. "properties": {
  48. "format": {
  49. "type": "string",
  50. "description": "时间格式",
  51. "enum": ["iso", "timestamp", "human", "chinese"],
  52. "default": "iso"
  53. }
  54. }
  55. }
  56. }
  57. # 文本处理工具
  58. self.tools["text_process"] = {
  59. "name": "text_process",
  60. "description": "文本处理工具",
  61. "inputSchema": {
  62. "type": "object",
  63. "properties": {
  64. "text": {
  65. "type": "string",
  66. "description": "输入文本"
  67. },
  68. "operation": {
  69. "type": "string",
  70. "description": "操作类型",
  71. "enum": ["length", "upper", "lower", "reverse", "count_words"],
  72. "default": "length"
  73. }
  74. },
  75. "required": ["text"]
  76. }
  77. }
  78. # 数据格式工具
  79. self.tools["format_data"] = {
  80. "name": "format_data",
  81. "description": "格式化数据",
  82. "inputSchema": {
  83. "type": "object",
  84. "properties": {
  85. "data": {
  86. "type": "string",
  87. "description": "原始数据"
  88. },
  89. "format": {
  90. "type": "string",
  91. "description": "格式类型",
  92. "enum": ["json", "yaml", "xml"],
  93. "default": "json"
  94. }
  95. },
  96. "required": ["data"]
  97. }
  98. }
  99. def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
  100. """处理请求"""
  101. try:
  102. method = request.get("method")
  103. params = request.get("params", {})
  104. if method == "tools/list":
  105. return self.handle_tools_list()
  106. elif method == "tools/call":
  107. return self.handle_tool_call(params)
  108. elif method == "ping":
  109. return {"result": "pong"}
  110. else:
  111. return self.create_error_response(
  112. code=-32601,
  113. message="Method not found"
  114. )
  115. except Exception as e:
  116. logger.error(f"Error handling request: {e}")
  117. return self.create_error_response(
  118. code=-32603,
  119. message=f"Internal error: {str(e)}"
  120. )
  121. def handle_tools_list(self) -> Dict[str, Any]:
  122. """列出所有工具 - 确保返回标准JSON"""
  123. return {
  124. "result": {
  125. "tools": list(self.tools.values())
  126. }
  127. }
  128. def handle_tool_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
  129. """调用工具 - 修复响应格式"""
  130. name = params.get("name")
  131. arguments = params.get("arguments", {})
  132. if name not in self.tools:
  133. return self.create_error_response(
  134. code=-32602,
  135. message=f"Tool '{name}' not found"
  136. )
  137. try:
  138. if name == "get_time":
  139. result = self.execute_get_time(arguments)
  140. elif name == "text_process":
  141. result = self.execute_text_process(arguments)
  142. elif name == "format_data":
  143. result = self.execute_format_data(arguments)
  144. else:
  145. return self.create_error_response(
  146. code=-32602,
  147. message="Tool not implemented"
  148. )
  149. # 确保返回正确的MCP响应格式
  150. return self.create_success_response(result)
  151. except Exception as e:
  152. logger.error(f"Tool execution error: {e}")
  153. return self.create_error_response(
  154. code=-32603,
  155. message=f"Tool execution failed: {str(e)}"
  156. )
  157. def execute_get_time(self, args: Dict[str, Any]) -> Dict[str, Any]:
  158. """获取时间 - 支持中文"""
  159. from datetime import datetime
  160. try:
  161. format_type = args.get("format", "iso")
  162. now = datetime.now()
  163. if format_type == "iso":
  164. result = now.isoformat()
  165. elif format_type == "timestamp":
  166. result = now.timestamp()
  167. elif format_type == "human":
  168. result = now.strftime("%Y-%m-%d %H:%M:%S")
  169. elif format_type == "chinese":
  170. result = now.strftime("%Y年%m月%d日 %H时%M分%S秒")
  171. else:
  172. result = now.isoformat()
  173. logger.info(f"当前系统时间:{result}")
  174. return {
  175. "status": "success",
  176. "format": format_type,
  177. "time": result,
  178. "timestamp": now.timestamp(),
  179. "date": now.strftime("%Y-%m-%d"),
  180. "time_12h": now.strftime("%I:%M:%S %p")
  181. }
  182. except Exception as e:
  183. return {
  184. "status": "error",
  185. "error": str(e)
  186. }
  187. def execute_text_process(self, args: Dict[str, Any]) -> Dict[str, Any]:
  188. """文本处理"""
  189. try:
  190. text = args.get("text", "")
  191. operation = args.get("operation", "length")
  192. if operation == "length":
  193. result = len(text)
  194. result_str = f"文本长度: {result} 个字符"
  195. elif operation == "upper":
  196. result = text.upper()
  197. result_str = f"大写: {result}"
  198. elif operation == "lower":
  199. result = text.lower()
  200. result_str = f"小写: {result}"
  201. elif operation == "reverse":
  202. result = text[::-1]
  203. result_str = f"反转: {result}"
  204. elif operation == "count_words":
  205. words = len(text.split())
  206. result = words
  207. result_str = f"单词数: {words}"
  208. else:
  209. raise ValueError(f"未知操作: {operation}")
  210. return {
  211. "status": "success",
  212. "operation": operation,
  213. "original_text": text,
  214. "result": result,
  215. "result_str": result_str,
  216. "text_length": len(text)
  217. }
  218. except Exception as e:
  219. return {
  220. "status": "error",
  221. "error": str(e),
  222. "operation": args.get("operation", "")
  223. }
  224. def execute_format_data(self, args: Dict[str, Any]) -> Dict[str, Any]:
  225. """格式化数据"""
  226. try:
  227. data_str = args.get("data", "")
  228. format_type = args.get("format", "json")
  229. # 尝试解析为JSON
  230. try:
  231. data = json.loads(data_str)
  232. is_json = True
  233. except:
  234. data = data_str
  235. is_json = False
  236. if format_type == "json":
  237. if is_json:
  238. result = json.dumps(data, ensure_ascii=False, indent=2)
  239. else:
  240. # 如果不是JSON,包装成JSON
  241. result = json.dumps({"text": data}, ensure_ascii=False, indent=2)
  242. elif format_type == "yaml":
  243. import yaml
  244. result = yaml.dump(data, allow_unicode=True, default_flow_style=False)
  245. elif format_type == "xml":
  246. # 简单的XML格式化
  247. if isinstance(data, dict):
  248. result = "<data>"
  249. for k, v in data.items():
  250. result += f"\n <{k}>{v}</{k}>"
  251. result += "\n</data>"
  252. else:
  253. result = f"<text>{data}</text>"
  254. else:
  255. result = str(data)
  256. return {
  257. "status": "success",
  258. "format": format_type,
  259. "original": data_str,
  260. "formatted": result,
  261. "length": len(result)
  262. }
  263. except Exception as e:
  264. return {
  265. "status": "error",
  266. "error": str(e),
  267. "format": args.get("format", "")
  268. }
  269. def create_success_response(self, data: Dict[str, Any]) -> Dict[str, Any]:
  270. """创建成功响应 - 确保符合MCP规范"""
  271. # 将数据转换为JSON字符串作为文本内容
  272. content_text = json.dumps(data, ensure_ascii=False, indent=2)
  273. return {
  274. "result": {
  275. "content": [
  276. {
  277. "type": "text",
  278. "text": content_text
  279. }
  280. ],
  281. "isError": False
  282. }
  283. }
  284. def create_error_response(self, code: int, message: str) -> Dict[str, Any]:
  285. """创建错误响应"""
  286. return {
  287. "error": {
  288. "code": code,
  289. "message": message
  290. }
  291. }
  292. def safe_json_dump(data: Dict[str, Any]) -> str:
  293. """安全的JSON序列化,确保UTF-8编码"""
  294. try:
  295. return json.dumps(data, ensure_ascii=False, separators=(',', ':'))
  296. except:
  297. # 如果失败,使用ASCII转义
  298. return json.dumps(data, ensure_ascii=True, separators=(',', ':'))
  299. def main():
  300. """主函数 - 修复Stdio通信"""
  301. logger.info("启动MCP Stdio服务器 (修复编码版)...")
  302. server = FixedMCPServer()
  303. # 初始握手消息
  304. init_message = {
  305. "jsonrpc": "2.0",
  306. "id": 1,
  307. "result": {
  308. "protocolVersion": "2024-11-05",
  309. "capabilities": {
  310. "tools": {}
  311. },
  312. "serverInfo": {
  313. "name": "fixed-mcp-server",
  314. "version": "1.0.0"
  315. }
  316. }
  317. }
  318. # 发送初始化响应
  319. try:
  320. sys.stdout.write(safe_json_dump(init_message) + "\n")
  321. sys.stdout.flush()
  322. except Exception as e:
  323. logger.error(f"发送初始化消息失败: {e}")
  324. return
  325. logger.info("MCP服务器已初始化")
  326. # 主循环
  327. line_num = 0
  328. while True:
  329. try:
  330. line = sys.stdin.readline()
  331. if not line:
  332. logger.info("输入流结束")
  333. break
  334. line = line.strip()
  335. line_num += 1
  336. if not line:
  337. continue
  338. logger.info(f"收到第 {line_num} 行: {line[:100]}...")
  339. try:
  340. request = json.loads(line)
  341. logger.info(f"解析请求: {request.get('method', 'unknown')}")
  342. # 处理请求
  343. response = server.handle_request(request)
  344. response["jsonrpc"] = "2.0"
  345. response["id"] = request.get("id")
  346. # 发送响应
  347. response_json = safe_json_dump(response)
  348. sys.stdout.write(response_json + "\n")
  349. sys.stdout.flush()
  350. logger.info(f"发送响应: {response.get('result', response.get('error', {}))}")
  351. except json.JSONDecodeError as e:
  352. logger.error(f"JSON解析错误: {e}")
  353. error_response = {
  354. "jsonrpc": "2.0",
  355. "error": {
  356. "code": -32700,
  357. "message": f"Parse error at line {line_num}"
  358. },
  359. "id": None
  360. }
  361. sys.stdout.write(safe_json_dump(error_response) + "\n")
  362. sys.stdout.flush()
  363. except KeyboardInterrupt:
  364. logger.info("接收到中断信号")
  365. break
  366. except Exception as e:
  367. logger.error(f"未处理的错误: {e}")
  368. break
  369. logger.info("MCP服务器已停止")
  370. if __name__ == "__main__":
  371. main()