streamable_http_client: one concurrent request HTTPStatusError tears down sibling requests

在 MCP Python SDK 中使用 streamable_http_client 进行并发 JSON-RPC 请求(例如并行调用多个 MCP 工具时),当其中一个请求收到服务器返回的 HTTP 非 2xx 状态码(如 400 Bad Request)时,该错误会传播到共享的任务组中,导致其他正

streamable_http_client: one concurrent request HTTPStatusError tears down sibling requests

streamable_http_client: one concurrent request HTTPStatusError tears down sibling requests

快速结论:此问题发生在 MCP Python SDK 的 streamable_http_client 中,当同时发起多个 JSON-RPC 请求时,某个请求收到非 2xx HTTP 响应(如 400)会导致整个传输层被销毁,进而中断其他仍在等待响应的请求。优先排查 SDK 版本,确认是否 v1.x 分支(尤其是 1.27.1 及之前版本),如已使用 main 分支则问题已修复。

问题场景

在 MCP Python SDK 中使用 streamable_http_client 进行并发 JSON-RPC 请求(例如并行调用多个 MCP 工具时),当其中一个请求收到服务器返回的 HTTP 非 2xx 状态码(如 400 Bad Request)时,该错误会传播到共享的任务组中,导致其他正在等待响应的兄弟请求被中断。

报错原文

ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)

File ".../mcp/shared/session.py", line 292, in send_request
  response_or_error = await response_stream_reader.receive()

File ".../anyio/streams/memory.py", line 117, in receive
  return self.receive_nowait()

File ".../anyio/streams/memory.py", line 112, in receive_nowait
  raise WouldBlock

anyio.WouldBlock

原因分析

src/mcp/client/streamable_http.pypost_writer() 函数中,并发请求通过 tg.start_soon(handle_request_async) 作为任务在共享的 TaskGroup 中启动。当其中一个请求的处理器抛出未捕获的异常(如 _handle_reconnection_handle_post_request 中的 raise_for_status())时,该异常会传播到共享的 TaskGroup。根据 anyio TaskGroup 的语义,任何任务中的未处理异常会导致整个 TaskGroup 取消所有兄弟任务。

WouldBlock 异常并非根本原因——它只是被中断的兄弟请求在等待响应流时的表现。

环境排查

  • 确认 MCP Python SDK 版本(特别是是否使用 v1.x 分支,如 v1.27.1 及之前版本)
  • 确认 Python 版本(3.12 及以上)
  • 确认是否存在并行工具调用场景

解决步骤

  1. 优先尝试升级到 main 分支:该 Issue 已在 main 分支(commit 616476f)通过 PR #2047 修复。修复内容为:_handle_post_request 不再调用 raise_for_status();非 2xx 响应被转换为 JSONRPCError 写入读流,从而避免 HTTP 错误逃逸出共享 TaskGroup。
  2. 如果无法升级到 main 分支(如被 v1.x 分支锁定):在 post_writer() 中为每个请求处理器包裹异常隔离代码。将每个 handle_request_async 任务中的异常捕获,并转换为 MCP 错误消息发送到读流,而不是让异常在共享 TaskGroup 中传播:
    async def handle_request_async():
        try:
            if is_resumption:
                await self._handle_resumption_request(ctx)
            else:
                await self._handle_post_request(ctx)
        except Exception as exc:
            logger.debug(f"Request handler isolated error: {exc}")
            if isinstance(ctx.session_message.message, JSONRPCRequest):
                error_data = ErrorData(
                    code=INTERNAL_ERROR,
                    message=f"Request failed: {exc}"
                )
                error_msg = SessionMessage(JSONRPCError(
                    jsonrpc="2.0",
                    id=ctx.session_message.message.id,
                    error=error_data
                ))
                try:
                    await ctx.read_stream_writer.send(error_msg)
                except Exception:
                    pass
  3. 请求 v1.x 分支的 backport:如果必须使用 v1.x 分支,请在 SDK 仓库中提交 backport 请求将 PR #2047 的修复移植到 v1.x 分支。

验证方法

运行包含并发请求的测试场景:同时发起一个“好”请求(正常返回)和一个“坏”请求(返回 HTTP 400)。修复后,“好”请求应仍然能够成功完成,不会因“坏”请求的错误而被中断。

# 验证脚本示例(需修改 URL 为实际服务器地址)
async def call(name):
    try:
        res = await s.send_request(
            CallToolRequest(method="tools/call", params=CallToolRequestParams(name=name, arguments={})),
            CallToolResult,
        )
        return ("ok", res)
    except BaseException as e:
        return ("err", f"{type(e).__name__}: {e}")

async with anyio.create_task_group() as tg:
    tg.start_soon(lambda: set_result("good", await call("good")))
    tg.start_soon(lambda: set_result("bad", await call("bad")))
# 期望:"good" 返回 ("ok", ...),"bad" 返回 ("err", ...)

参考来源

modelcontextprotocol/python-sdk #2604

GamsGo AI

AI 工具推荐

想把多个 AI 模型放在一个入口?

GamsGo AI 集成 ChatGPT、DeepSeek、Gemini、Claude、Midjourney、Veo 等常用模型,适合写作、绘图、视频和日常 AI 工作流。

了解 GamsGo AI

推广链接:通过此链接购买,我可能获得佣金,不影响你的价格。

celebrityanime
celebrityanime
文章: 7645

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注