Skip to content

fix: cancel platform tasks before adapter shutdown#6147

Open
tsubasakong wants to merge 1 commit intoAstrBotDevs:masterfrom
tsubasakong:fix/6100-delete-qq-official-adapter-cleanly
Open

fix: cancel platform tasks before adapter shutdown#6147
tsubasakong wants to merge 1 commit intoAstrBotDevs:masterfrom
tsubasakong:fix/6100-delete-qq-official-adapter-cleanly

Conversation

@tsubasakong
Copy link

@tsubasakong tsubasakong commented Mar 12, 2026

Fixes #6100.

Modifications / 改动点

  • cancel a platform adapter's run/wrapper tasks before awaiting its terminate() hook

  • document why this order matters for websocket-based adapters such as qq_official

  • add a regression test covering adapters whose shutdown depends on their background task being cancelled first

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

python3.11 -m pytest tests/unit/test_platform_manager.py -q
.                                                                        [100%]
1 passed in 1.29s

git diff --check
# no output

Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Summary by Sourcery

确保在调用适配器特定的终止钩子之前,平台适配器的关闭过程会先停止后台任务,从而避免残留的 WebSocket 客户端。

Bug 修复:

  • 通过在等待 terminate() 之前取消适配器的运行任务和包装任务,防止基于 WebSocket 的平台适配器在关闭期间继续接收事件。

测试:

  • 添加一个回归测试,验证平台管理器在关闭时,会在调用适配器的终止钩子之前取消适配器的运行任务和包装任务。
Original summary in English

Summary by Sourcery

Ensure platform adapter shutdown stops background tasks before invoking adapter-specific terminate hooks to avoid lingering websocket clients.

Bug Fixes:

  • Prevent websocket-based platform adapters from continuing to receive events during shutdown by cancelling their run and wrapper tasks before terminate() is awaited.

Tests:

  • Add a regression test verifying that platform manager shutdown cancels adapter run and wrapper tasks before calling the adapter's terminate hook.

@auto-assign auto-assign bot requested review from Raven95676 and anka-afk March 12, 2026 17:38
@dosubot dosubot bot added the size:M This PR changes 30-99 lines, ignoring generated files. label Mar 12, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refines the shutdown mechanism for platform adapters to prevent potential issues where long-running tasks might keep an adapter alive even after its terminate() method is called. By ensuring that all associated tasks are cancelled before the adapter's specific termination logic is executed, the change guarantees a more reliable and complete shutdown, particularly for adapters relying on persistent connections like websockets.

Highlights

  • Task Cancellation Order: Reordered the shutdown process in _terminate_inst_and_tasks to ensure platform run/wrapper tasks are cancelled before the adapter's terminate() method is called.
  • Improved Documentation: Added a detailed comment explaining the importance of this new order, specifically for websocket-based adapters that might otherwise remain active.
  • Regression Test: Introduced a new unit test (test_platform_manager.py) to verify that platform tasks are correctly cancelled prior to adapter termination, preventing potential resource leaks.
Changelog
  • astrbot/core/platform/manager.py
    • Changed the order of operations in _terminate_inst_and_tasks to prioritize cancelling platform tasks before invoking the adapter's terminate() method.
    • Added an explanatory comment detailing the rationale for this change, specifically mentioning websocket-based adapters.
  • tests/unit/test_platform_manager.py
    • Added a new test file containing test_terminate_stops_platform_tasks_before_adapter_shutdown to validate the correct sequence of task cancellation and adapter termination during shutdown.
Activity
  • The pull request addresses and fixes issue [Bug] UI 删除qq官方机器人(websocket)后,仍能调用 #6100.
  • The author, tsubasakong, confirmed that this is not a breaking change.
  • Verification steps and test results (pytest output) were provided, showing 1 passed test.
  • The author confirmed no new dependencies were introduced and no malicious code was added.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added the area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. label Mar 12, 2026
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly addresses an issue where platform adapter shutdown could hang by ensuring background tasks are cancelled before the adapter's terminate() hook is called. A comprehensive regression test has been added to verify this behavior, which is a great addition. My review includes one suggestion to improve the robustness of the shutdown logic, restoring a guarantee that was present in the original code.

Comment on lines +76 to +89
await self._stop_platform_task(client_id)

if getattr(inst, "terminate", None):
try:
await inst.terminate()
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(
"终止平台适配器失败: client_id=%s, error=%s",
client_id,
e,
)
logger.error(traceback.format_exc())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While changing the order of operations is correct to fix the bug, removing the try...finally construct makes the shutdown process less robust. If _stop_platform_task fails with an exception, inst.terminate() will not be called, potentially leaving resources held by the adapter unreleased. To maintain the robustness of the original implementation, it's better to wrap _stop_platform_task in its own try...except block. This ensures that an attempt to terminate the instance is made even if stopping the tasks fails.

        try:
            await self._stop_platform_task(client_id)
        except Exception as e:
            logger.error(
                "停止平台任务失败: client_id=%s, error=%s",
                client_id,
                e,
            )
            logger.error(traceback.format_exc())

        if getattr(inst, "terminate", None):
            try:
                await inst.terminate()
            except asyncio.CancelledError:
                raise
            except Exception as e:
                logger.error(
                    "终止平台适配器失败: client_id=%s, error=%s",
                    client_id,
                    e,
                )
                logger.error(traceback.format_exc())

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了两个问题,并留下了一些整体性的反馈:

  • 新的单元测试直接访问了 PlatformManager._platform_tasks;如果可能的话,更建议通过公共 API(例如正常的适配器注册 / 启动 / 停止流程)来触发相同行为,这样测试对私有内部实现的耦合会更低。
给 AI Agent 的提示语
Please address the comments from this code review:

## Overall Comments
- The new unit test reaches into `PlatformManager._platform_tasks` directly; if possible, prefer exercising this behavior via the public API (e.g., the normal adapter registration/start/stop flow) so the test is less coupled to private internals.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/manager.py" line_range="85" />
<code_context>
-                    )
-                    logger.error(traceback.format_exc())
-        finally:
-            await self._stop_platform_task(client_id)
+
+        # Stop the platform run/wrapper tasks before awaiting adapter-specific
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider guarding `_stop_platform_task` so adapter-specific `terminate()` still runs if stopping tasks fails.

With the previous `finally` block, `_stop_platform_task` always ran, even if `terminate()` failed. Now, any exception in `_stop_platform_task` will prevent `terminate()` from running. To preserve best-effort adapter cleanup while keeping the new ordering, you could wrap `_stop_platform_task` in a `try/except Exception` that logs the error and still calls `terminate()` afterward.
</issue_to_address>

### Comment 2
<location path="tests/unit/test_platform_manager.py" line_range="37-41" />
<code_context>
+        wrapper=wrapper_task,
+    )
+
+    await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)
+
+    assert inst.run_task_cancelled.is_set()
+    assert run_task.cancelled()
+    assert wrapper_task.cancelled()
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding separate tests for adapters without `terminate()` and for error paths in `terminate()`.

To better exercise `_terminate_inst_and_tasks`, consider adding:

1. A test where the adapter has no `terminate` attribute, asserting the function still completes and cancels both `run` and `wrapper` tasks.
2. Tests where `terminate()` raises:
   - A generic `Exception`, verifying both tasks are cancelled despite the error.
   - `asyncio.CancelledError`, verifying tasks are cancelled and the cancellation propagates as expected.

These would cover the main edge and error paths of the shutdown logic.

Suggested implementation:

```python
import asyncio
from types import SimpleNamespace

```

```python
    manager = PlatformManager({"platform": [], "platform_settings": {}}, asyncio.Queue())

    #
    # Happy-path: adapter.terminate() exists and shutdown cancels both tasks
    #
    inst = _PlatformThatNeedsTaskStopped()

    run_task = asyncio.create_task(_run_until_cancelled(inst.run_task_cancelled))
    wrapper_task = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst.client_self_id] = PlatformTasks(
        run=run_task,
        wrapper=wrapper_task,
    )

    await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)

    assert inst.run_task_cancelled.is_set()
    assert run_task.cancelled()
    assert wrapper_task.cancelled()

    #
    # Adapter without terminate(): _terminate_inst_and_tasks should still cancel tasks
    #
    inst_no_terminate = _PlatformThatNeedsTaskStopped()
    inst_no_terminate.adapter = SimpleNamespace()  # no terminate attribute

    run_task_no_terminate = asyncio.create_task(
        _run_until_cancelled(inst_no_terminate.run_task_cancelled)
    )
    wrapper_task_no_terminate = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_no_terminate.client_self_id] = PlatformTasks(
        run=run_task_no_terminate,
        wrapper=wrapper_task_no_terminate,
    )

    await asyncio.wait_for(
        manager._terminate_inst_and_tasks(inst_no_terminate), timeout=1
    )

    assert inst_no_terminate.run_task_cancelled.is_set()
    assert run_task_no_terminate.cancelled()
    assert wrapper_task_no_terminate.cancelled()

    #
    # Adapter.terminate() raises a generic Exception: tasks should still be cancelled
    #
    class _AdapterWithFailingTerminate:
        async def terminate(self) -> None:
            raise Exception("terminate failed")

    inst_terminate_raises = _PlatformThatNeedsTaskStopped()
    inst_terminate_raises.adapter = _AdapterWithFailingTerminate()

    run_task_terminate_raises = asyncio.create_task(
        _run_until_cancelled(inst_terminate_raises.run_task_cancelled)
    )
    wrapper_task_terminate_raises = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_terminate_raises.client_self_id] = PlatformTasks(
        run=run_task_terminate_raises,
        wrapper=wrapper_task_terminate_raises,
    )

    await asyncio.wait_for(
        manager._terminate_inst_and_tasks(inst_terminate_raises), timeout=1
    )

    assert inst_terminate_raises.run_task_cancelled.is_set()
    assert run_task_terminate_raises.cancelled()
    assert wrapper_task_terminate_raises.cancelled()

    #
    # Adapter.terminate() raises asyncio.CancelledError:
    # verify tasks are cancelled and cancellation propagates as expected
    #
    class _AdapterWithCancelledTerminate:
        async def terminate(self) -> None:
            raise asyncio.CancelledError()

    inst_terminate_cancelled = _PlatformThatNeedsTaskStopped()
    inst_terminate_cancelled.adapter = _AdapterWithCancelledTerminate()

    run_task_terminate_cancelled = asyncio.create_task(
        _run_until_cancelled(inst_terminate_cancelled.run_task_cancelled)
    )
    wrapper_task_terminate_cancelled = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_terminate_cancelled.client_self_id] = PlatformTasks(
        run=run_task_terminate_cancelled,
        wrapper=wrapper_task_terminate_cancelled,
    )

    with pytest.raises(asyncio.CancelledError):
        await asyncio.wait_for(
            manager._terminate_inst_and_tasks(inst_terminate_cancelled), timeout=1
        )

    assert inst_terminate_cancelled.run_task_cancelled.is_set()
    assert run_task_terminate_cancelled.cancelled()
    assert wrapper_task_terminate_cancelled.cancelled()

```

1. The new `pytest.raises` context requires `pytest` to be imported if it is not already in this file:
   - Add `import pytest` near the top of `tests/unit/test_platform_manager.py`.
2. These scenarios are implemented inside the existing test body. If you want truly separate tests per your review comment, you can:
   - Extract each scenario into its own `async def test_...` function, reusing the bodies of the four sections (happy-path, no-terminate, terminate-raises, terminate-cancelled).
   - Keep the setup pattern (`PlatformManager`, `_PlatformThatNeedsTaskStopped`, and `_run_until_cancelled`) identical to the existing test for consistency.
3. The code assumes that `_terminate_inst_and_tasks` uses an `adapter` attribute on `inst`. If the actual attribute name differs, update `inst_*.adapter = ...` to match the real attribute used in `PlatformManager._terminate_inst_and_tasks`.
</issue_to_address>

Sourcery 对开源项目是免费的——如果你觉得我们的 Review 有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的 Review。
Original comment in English

Hey - I've found 2 issues, and left some high level feedback:

  • The new unit test reaches into PlatformManager._platform_tasks directly; if possible, prefer exercising this behavior via the public API (e.g., the normal adapter registration/start/stop flow) so the test is less coupled to private internals.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new unit test reaches into `PlatformManager._platform_tasks` directly; if possible, prefer exercising this behavior via the public API (e.g., the normal adapter registration/start/stop flow) so the test is less coupled to private internals.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/manager.py" line_range="85" />
<code_context>
-                    )
-                    logger.error(traceback.format_exc())
-        finally:
-            await self._stop_platform_task(client_id)
+
+        # Stop the platform run/wrapper tasks before awaiting adapter-specific
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider guarding `_stop_platform_task` so adapter-specific `terminate()` still runs if stopping tasks fails.

With the previous `finally` block, `_stop_platform_task` always ran, even if `terminate()` failed. Now, any exception in `_stop_platform_task` will prevent `terminate()` from running. To preserve best-effort adapter cleanup while keeping the new ordering, you could wrap `_stop_platform_task` in a `try/except Exception` that logs the error and still calls `terminate()` afterward.
</issue_to_address>

### Comment 2
<location path="tests/unit/test_platform_manager.py" line_range="37-41" />
<code_context>
+        wrapper=wrapper_task,
+    )
+
+    await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)
+
+    assert inst.run_task_cancelled.is_set()
+    assert run_task.cancelled()
+    assert wrapper_task.cancelled()
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding separate tests for adapters without `terminate()` and for error paths in `terminate()`.

To better exercise `_terminate_inst_and_tasks`, consider adding:

1. A test where the adapter has no `terminate` attribute, asserting the function still completes and cancels both `run` and `wrapper` tasks.
2. Tests where `terminate()` raises:
   - A generic `Exception`, verifying both tasks are cancelled despite the error.
   - `asyncio.CancelledError`, verifying tasks are cancelled and the cancellation propagates as expected.

These would cover the main edge and error paths of the shutdown logic.

Suggested implementation:

```python
import asyncio
from types import SimpleNamespace

```

```python
    manager = PlatformManager({"platform": [], "platform_settings": {}}, asyncio.Queue())

    #
    # Happy-path: adapter.terminate() exists and shutdown cancels both tasks
    #
    inst = _PlatformThatNeedsTaskStopped()

    run_task = asyncio.create_task(_run_until_cancelled(inst.run_task_cancelled))
    wrapper_task = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst.client_self_id] = PlatformTasks(
        run=run_task,
        wrapper=wrapper_task,
    )

    await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)

    assert inst.run_task_cancelled.is_set()
    assert run_task.cancelled()
    assert wrapper_task.cancelled()

    #
    # Adapter without terminate(): _terminate_inst_and_tasks should still cancel tasks
    #
    inst_no_terminate = _PlatformThatNeedsTaskStopped()
    inst_no_terminate.adapter = SimpleNamespace()  # no terminate attribute

    run_task_no_terminate = asyncio.create_task(
        _run_until_cancelled(inst_no_terminate.run_task_cancelled)
    )
    wrapper_task_no_terminate = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_no_terminate.client_self_id] = PlatformTasks(
        run=run_task_no_terminate,
        wrapper=wrapper_task_no_terminate,
    )

    await asyncio.wait_for(
        manager._terminate_inst_and_tasks(inst_no_terminate), timeout=1
    )

    assert inst_no_terminate.run_task_cancelled.is_set()
    assert run_task_no_terminate.cancelled()
    assert wrapper_task_no_terminate.cancelled()

    #
    # Adapter.terminate() raises a generic Exception: tasks should still be cancelled
    #
    class _AdapterWithFailingTerminate:
        async def terminate(self) -> None:
            raise Exception("terminate failed")

    inst_terminate_raises = _PlatformThatNeedsTaskStopped()
    inst_terminate_raises.adapter = _AdapterWithFailingTerminate()

    run_task_terminate_raises = asyncio.create_task(
        _run_until_cancelled(inst_terminate_raises.run_task_cancelled)
    )
    wrapper_task_terminate_raises = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_terminate_raises.client_self_id] = PlatformTasks(
        run=run_task_terminate_raises,
        wrapper=wrapper_task_terminate_raises,
    )

    await asyncio.wait_for(
        manager._terminate_inst_and_tasks(inst_terminate_raises), timeout=1
    )

    assert inst_terminate_raises.run_task_cancelled.is_set()
    assert run_task_terminate_raises.cancelled()
    assert wrapper_task_terminate_raises.cancelled()

    #
    # Adapter.terminate() raises asyncio.CancelledError:
    # verify tasks are cancelled and cancellation propagates as expected
    #
    class _AdapterWithCancelledTerminate:
        async def terminate(self) -> None:
            raise asyncio.CancelledError()

    inst_terminate_cancelled = _PlatformThatNeedsTaskStopped()
    inst_terminate_cancelled.adapter = _AdapterWithCancelledTerminate()

    run_task_terminate_cancelled = asyncio.create_task(
        _run_until_cancelled(inst_terminate_cancelled.run_task_cancelled)
    )
    wrapper_task_terminate_cancelled = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_terminate_cancelled.client_self_id] = PlatformTasks(
        run=run_task_terminate_cancelled,
        wrapper=wrapper_task_terminate_cancelled,
    )

    with pytest.raises(asyncio.CancelledError):
        await asyncio.wait_for(
            manager._terminate_inst_and_tasks(inst_terminate_cancelled), timeout=1
        )

    assert inst_terminate_cancelled.run_task_cancelled.is_set()
    assert run_task_terminate_cancelled.cancelled()
    assert wrapper_task_terminate_cancelled.cancelled()

```

1. The new `pytest.raises` context requires `pytest` to be imported if it is not already in this file:
   - Add `import pytest` near the top of `tests/unit/test_platform_manager.py`.
2. These scenarios are implemented inside the existing test body. If you want truly separate tests per your review comment, you can:
   - Extract each scenario into its own `async def test_...` function, reusing the bodies of the four sections (happy-path, no-terminate, terminate-raises, terminate-cancelled).
   - Keep the setup pattern (`PlatformManager`, `_PlatformThatNeedsTaskStopped`, and `_run_until_cancelled`) identical to the existing test for consistency.
3. The code assumes that `_terminate_inst_and_tasks` uses an `adapter` attribute on `inst`. If the actual attribute name differs, update `inst_*.adapter = ...` to match the real attribute used in `PlatformManager._terminate_inst_and_tasks`.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

raise
except Exception as e:
logger.error(
"终止平台适配器失败: client_id=%s, error=%s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 建议为 _stop_platform_task 增加保护,以便在停止任务失败时,适配器特定的 terminate() 仍然可以执行。

在之前的 finally 代码块中,即使 terminate() 失败,_stop_platform_task 也总是会被执行。现在,只要 _stop_platform_task 中抛出异常,就会阻止后续的 terminate() 执行。为了在保持新的调用顺序的同时,仍尽可能完成适配器的清理,你可以考虑把 _stop_platform_task 包装在一个 try/except Exception 中,记录错误日志后仍然调用 terminate()

Original comment in English

suggestion (bug_risk): Consider guarding _stop_platform_task so adapter-specific terminate() still runs if stopping tasks fails.

With the previous finally block, _stop_platform_task always ran, even if terminate() failed. Now, any exception in _stop_platform_task will prevent terminate() from running. To preserve best-effort adapter cleanup while keeping the new ordering, you could wrap _stop_platform_task in a try/except Exception that logs the error and still calls terminate() afterward.

Comment on lines +37 to +41
await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)

assert inst.run_task_cancelled.is_set()
assert run_task.cancelled()
assert wrapper_task.cancelled()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): 建议为不带 terminate() 的适配器,以及 terminate() 出错路径分别增加测试用例。

为了更好地覆盖 _terminate_inst_and_tasks,可以考虑增加:

  1. 一个适配器不包含 terminate 属性的测试,断言该函数仍能正常结束,并取消 runwrapper 两个任务。
  2. 若干 terminate() 抛出异常时的测试:
    • 抛出通用 Exception,验证即使出现错误,两个任务仍然被取消;
    • 抛出 asyncio.CancelledError,验证任务被取消且取消状态按预期传播。

这些测试可以覆盖关闭逻辑的主要边界与错误路径。

建议的实现:

import asyncio
from types import SimpleNamespace
    manager = PlatformManager({"platform": [], "platform_settings": {}}, asyncio.Queue())

    #
    # Happy-path: adapter.terminate() exists and shutdown cancels both tasks
    #
    inst = _PlatformThatNeedsTaskStopped()

    run_task = asyncio.create_task(_run_until_cancelled(inst.run_task_cancelled))
    wrapper_task = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst.client_self_id] = PlatformTasks(
        run=run_task,
        wrapper=wrapper_task,
    )

    await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)

    assert inst.run_task_cancelled.is_set()
    assert run_task.cancelled()
    assert wrapper_task.cancelled()

    #
    # Adapter without terminate(): _terminate_inst_and_tasks should still cancel tasks
    #
    inst_no_terminate = _PlatformThatNeedsTaskStopped()
    inst_no_terminate.adapter = SimpleNamespace()  # no terminate attribute

    run_task_no_terminate = asyncio.create_task(
        _run_until_cancelled(inst_no_terminate.run_task_cancelled)
    )
    wrapper_task_no_terminate = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_no_terminate.client_self_id] = PlatformTasks(
        run=run_task_no_terminate,
        wrapper=wrapper_task_no_terminate,
    )

    await asyncio.wait_for(
        manager._terminate_inst_and_tasks(inst_no_terminate), timeout=1
    )

    assert inst_no_terminate.run_task_cancelled.is_set()
    assert run_task_no_terminate.cancelled()
    assert wrapper_task_no_terminate.cancelled()

    #
    # Adapter.terminate() raises a generic Exception: tasks should still be cancelled
    #
    class _AdapterWithFailingTerminate:
        async def terminate(self) -> None:
            raise Exception("terminate failed")

    inst_terminate_raises = _PlatformThatNeedsTaskStopped()
    inst_terminate_raises.adapter = _AdapterWithFailingTerminate()

    run_task_terminate_raises = asyncio.create_task(
        _run_until_cancelled(inst_terminate_raises.run_task_cancelled)
    )
    wrapper_task_terminate_raises = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_terminate_raises.client_self_id] = PlatformTasks(
        run=run_task_terminate_raises,
        wrapper=wrapper_task_terminate_raises,
    )

    await asyncio.wait_for(
        manager._terminate_inst_and_tasks(inst_terminate_raises), timeout=1
    )

    assert inst_terminate_raises.run_task_cancelled.is_set()
    assert run_task_terminate_raises.cancelled()
    assert wrapper_task_terminate_raises.cancelled()

    #
    # Adapter.terminate() raises asyncio.CancelledError:
    # verify tasks are cancelled and cancellation propagates as expected
    #
    class _AdapterWithCancelledTerminate:
        async def terminate(self) -> None:
            raise asyncio.CancelledError()

    inst_terminate_cancelled = _PlatformThatNeedsTaskStopped()
    inst_terminate_cancelled.adapter = _AdapterWithCancelledTerminate()

    run_task_terminate_cancelled = asyncio.create_task(
        _run_until_cancelled(inst_terminate_cancelled.run_task_cancelled)
    )
    wrapper_task_terminate_cancelled = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_terminate_cancelled.client_self_id] = PlatformTasks(
        run=run_task_terminate_cancelled,
        wrapper=wrapper_task_terminate_cancelled,
    )

    with pytest.raises(asyncio.CancelledError):
        await asyncio.wait_for(
            manager._terminate_inst_and_tasks(inst_terminate_cancelled), timeout=1
        )

    assert inst_terminate_cancelled.run_task_cancelled.is_set()
    assert run_task_terminate_cancelled.cancelled()
    assert wrapper_task_terminate_cancelled.cancelled()
  1. 新增的 pytest.raises 上下文需要在文件中导入 pytest(如果还未导入):
    • tests/unit/test_platform_manager.py 顶部添加 import pytest
  2. 目前这些场景都实现在同一个测试函数体内。如果你希望像评审建议那样拆成真正独立的测试,可以:
    • 将每个场景抽取成单独的 async def test_... 函数,复用这四个部分的测试主体(happy-path、no-terminate、terminate-raises、terminate-cancelled)。
    • 保持与现有测试一致的初始化模式(PlatformManager_PlatformThatNeedsTaskStopped_run_until_cancelled)。
  3. 这段代码假设 _terminate_inst_and_tasksinst 上使用的是 adapter 属性。如果真实代码使用的属性名不同,请将 inst_*.adapter = ... 更新为 PlatformManager._terminate_inst_and_tasks 实际使用的属性名。
Original comment in English

suggestion (testing): Consider adding separate tests for adapters without terminate() and for error paths in terminate().

To better exercise _terminate_inst_and_tasks, consider adding:

  1. A test where the adapter has no terminate attribute, asserting the function still completes and cancels both run and wrapper tasks.
  2. Tests where terminate() raises:
    • A generic Exception, verifying both tasks are cancelled despite the error.
    • asyncio.CancelledError, verifying tasks are cancelled and the cancellation propagates as expected.

These would cover the main edge and error paths of the shutdown logic.

Suggested implementation:

import asyncio
from types import SimpleNamespace
    manager = PlatformManager({"platform": [], "platform_settings": {}}, asyncio.Queue())

    #
    # Happy-path: adapter.terminate() exists and shutdown cancels both tasks
    #
    inst = _PlatformThatNeedsTaskStopped()

    run_task = asyncio.create_task(_run_until_cancelled(inst.run_task_cancelled))
    wrapper_task = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst.client_self_id] = PlatformTasks(
        run=run_task,
        wrapper=wrapper_task,
    )

    await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)

    assert inst.run_task_cancelled.is_set()
    assert run_task.cancelled()
    assert wrapper_task.cancelled()

    #
    # Adapter without terminate(): _terminate_inst_and_tasks should still cancel tasks
    #
    inst_no_terminate = _PlatformThatNeedsTaskStopped()
    inst_no_terminate.adapter = SimpleNamespace()  # no terminate attribute

    run_task_no_terminate = asyncio.create_task(
        _run_until_cancelled(inst_no_terminate.run_task_cancelled)
    )
    wrapper_task_no_terminate = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_no_terminate.client_self_id] = PlatformTasks(
        run=run_task_no_terminate,
        wrapper=wrapper_task_no_terminate,
    )

    await asyncio.wait_for(
        manager._terminate_inst_and_tasks(inst_no_terminate), timeout=1
    )

    assert inst_no_terminate.run_task_cancelled.is_set()
    assert run_task_no_terminate.cancelled()
    assert wrapper_task_no_terminate.cancelled()

    #
    # Adapter.terminate() raises a generic Exception: tasks should still be cancelled
    #
    class _AdapterWithFailingTerminate:
        async def terminate(self) -> None:
            raise Exception("terminate failed")

    inst_terminate_raises = _PlatformThatNeedsTaskStopped()
    inst_terminate_raises.adapter = _AdapterWithFailingTerminate()

    run_task_terminate_raises = asyncio.create_task(
        _run_until_cancelled(inst_terminate_raises.run_task_cancelled)
    )
    wrapper_task_terminate_raises = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_terminate_raises.client_self_id] = PlatformTasks(
        run=run_task_terminate_raises,
        wrapper=wrapper_task_terminate_raises,
    )

    await asyncio.wait_for(
        manager._terminate_inst_and_tasks(inst_terminate_raises), timeout=1
    )

    assert inst_terminate_raises.run_task_cancelled.is_set()
    assert run_task_terminate_raises.cancelled()
    assert wrapper_task_terminate_raises.cancelled()

    #
    # Adapter.terminate() raises asyncio.CancelledError:
    # verify tasks are cancelled and cancellation propagates as expected
    #
    class _AdapterWithCancelledTerminate:
        async def terminate(self) -> None:
            raise asyncio.CancelledError()

    inst_terminate_cancelled = _PlatformThatNeedsTaskStopped()
    inst_terminate_cancelled.adapter = _AdapterWithCancelledTerminate()

    run_task_terminate_cancelled = asyncio.create_task(
        _run_until_cancelled(inst_terminate_cancelled.run_task_cancelled)
    )
    wrapper_task_terminate_cancelled = asyncio.create_task(asyncio.sleep(3600))
    manager._platform_tasks[inst_terminate_cancelled.client_self_id] = PlatformTasks(
        run=run_task_terminate_cancelled,
        wrapper=wrapper_task_terminate_cancelled,
    )

    with pytest.raises(asyncio.CancelledError):
        await asyncio.wait_for(
            manager._terminate_inst_and_tasks(inst_terminate_cancelled), timeout=1
        )

    assert inst_terminate_cancelled.run_task_cancelled.is_set()
    assert run_task_terminate_cancelled.cancelled()
    assert wrapper_task_terminate_cancelled.cancelled()
  1. The new pytest.raises context requires pytest to be imported if it is not already in this file:
    • Add import pytest near the top of tests/unit/test_platform_manager.py.
  2. These scenarios are implemented inside the existing test body. If you want truly separate tests per your review comment, you can:
    • Extract each scenario into its own async def test_... function, reusing the bodies of the four sections (happy-path, no-terminate, terminate-raises, terminate-cancelled).
    • Keep the setup pattern (PlatformManager, _PlatformThatNeedsTaskStopped, and _run_until_cancelled) identical to the existing test for consistency.
  3. The code assumes that _terminate_inst_and_tasks uses an adapter attribute on inst. If the actual attribute name differs, update inst_*.adapter = ... to match the real attribute used in PlatformManager._terminate_inst_and_tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:M This PR changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] UI 删除qq官方机器人(websocket)后,仍能调用

1 participant