fix: cancel platform tasks before adapter shutdown#6147
fix: cancel platform tasks before adapter shutdown#6147tsubasakong wants to merge 1 commit intoAstrBotDevs:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refines the shutdown mechanism for platform adapters to prevent potential issues where long-running tasks might keep an adapter alive even after its Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request correctly addresses an issue where platform adapter shutdown could hang by ensuring background tasks are cancelled before the adapter's terminate() hook is called. A comprehensive regression test has been added to verify this behavior, which is a great addition. My review includes one suggestion to improve the robustness of the shutdown logic, restoring a guarantee that was present in the original code.
| await self._stop_platform_task(client_id) | ||
|
|
||
| if getattr(inst, "terminate", None): | ||
| try: | ||
| await inst.terminate() | ||
| except asyncio.CancelledError: | ||
| raise | ||
| except Exception as e: | ||
| logger.error( | ||
| "终止平台适配器失败: client_id=%s, error=%s", | ||
| client_id, | ||
| e, | ||
| ) | ||
| logger.error(traceback.format_exc()) |
There was a problem hiding this comment.
While changing the order of operations is correct to fix the bug, removing the try...finally construct makes the shutdown process less robust. If _stop_platform_task fails with an exception, inst.terminate() will not be called, potentially leaving resources held by the adapter unreleased. To maintain the robustness of the original implementation, it's better to wrap _stop_platform_task in its own try...except block. This ensures that an attempt to terminate the instance is made even if stopping the tasks fails.
try:
await self._stop_platform_task(client_id)
except Exception as e:
logger.error(
"停止平台任务失败: client_id=%s, error=%s",
client_id,
e,
)
logger.error(traceback.format_exc())
if getattr(inst, "terminate", None):
try:
await inst.terminate()
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(
"终止平台适配器失败: client_id=%s, error=%s",
client_id,
e,
)
logger.error(traceback.format_exc())There was a problem hiding this comment.
Hey - 我发现了两个问题,并留下了一些整体性的反馈:
- 新的单元测试直接访问了
PlatformManager._platform_tasks;如果可能的话,更建议通过公共 API(例如正常的适配器注册 / 启动 / 停止流程)来触发相同行为,这样测试对私有内部实现的耦合会更低。
给 AI Agent 的提示语
Please address the comments from this code review:
## Overall Comments
- The new unit test reaches into `PlatformManager._platform_tasks` directly; if possible, prefer exercising this behavior via the public API (e.g., the normal adapter registration/start/stop flow) so the test is less coupled to private internals.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/manager.py" line_range="85" />
<code_context>
- )
- logger.error(traceback.format_exc())
- finally:
- await self._stop_platform_task(client_id)
+
+ # Stop the platform run/wrapper tasks before awaiting adapter-specific
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider guarding `_stop_platform_task` so adapter-specific `terminate()` still runs if stopping tasks fails.
With the previous `finally` block, `_stop_platform_task` always ran, even if `terminate()` failed. Now, any exception in `_stop_platform_task` will prevent `terminate()` from running. To preserve best-effort adapter cleanup while keeping the new ordering, you could wrap `_stop_platform_task` in a `try/except Exception` that logs the error and still calls `terminate()` afterward.
</issue_to_address>
### Comment 2
<location path="tests/unit/test_platform_manager.py" line_range="37-41" />
<code_context>
+ wrapper=wrapper_task,
+ )
+
+ await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)
+
+ assert inst.run_task_cancelled.is_set()
+ assert run_task.cancelled()
+ assert wrapper_task.cancelled()
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding separate tests for adapters without `terminate()` and for error paths in `terminate()`.
To better exercise `_terminate_inst_and_tasks`, consider adding:
1. A test where the adapter has no `terminate` attribute, asserting the function still completes and cancels both `run` and `wrapper` tasks.
2. Tests where `terminate()` raises:
- A generic `Exception`, verifying both tasks are cancelled despite the error.
- `asyncio.CancelledError`, verifying tasks are cancelled and the cancellation propagates as expected.
These would cover the main edge and error paths of the shutdown logic.
Suggested implementation:
```python
import asyncio
from types import SimpleNamespace
```
```python
manager = PlatformManager({"platform": [], "platform_settings": {}}, asyncio.Queue())
#
# Happy-path: adapter.terminate() exists and shutdown cancels both tasks
#
inst = _PlatformThatNeedsTaskStopped()
run_task = asyncio.create_task(_run_until_cancelled(inst.run_task_cancelled))
wrapper_task = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst.client_self_id] = PlatformTasks(
run=run_task,
wrapper=wrapper_task,
)
await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)
assert inst.run_task_cancelled.is_set()
assert run_task.cancelled()
assert wrapper_task.cancelled()
#
# Adapter without terminate(): _terminate_inst_and_tasks should still cancel tasks
#
inst_no_terminate = _PlatformThatNeedsTaskStopped()
inst_no_terminate.adapter = SimpleNamespace() # no terminate attribute
run_task_no_terminate = asyncio.create_task(
_run_until_cancelled(inst_no_terminate.run_task_cancelled)
)
wrapper_task_no_terminate = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_no_terminate.client_self_id] = PlatformTasks(
run=run_task_no_terminate,
wrapper=wrapper_task_no_terminate,
)
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_no_terminate), timeout=1
)
assert inst_no_terminate.run_task_cancelled.is_set()
assert run_task_no_terminate.cancelled()
assert wrapper_task_no_terminate.cancelled()
#
# Adapter.terminate() raises a generic Exception: tasks should still be cancelled
#
class _AdapterWithFailingTerminate:
async def terminate(self) -> None:
raise Exception("terminate failed")
inst_terminate_raises = _PlatformThatNeedsTaskStopped()
inst_terminate_raises.adapter = _AdapterWithFailingTerminate()
run_task_terminate_raises = asyncio.create_task(
_run_until_cancelled(inst_terminate_raises.run_task_cancelled)
)
wrapper_task_terminate_raises = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_terminate_raises.client_self_id] = PlatformTasks(
run=run_task_terminate_raises,
wrapper=wrapper_task_terminate_raises,
)
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_terminate_raises), timeout=1
)
assert inst_terminate_raises.run_task_cancelled.is_set()
assert run_task_terminate_raises.cancelled()
assert wrapper_task_terminate_raises.cancelled()
#
# Adapter.terminate() raises asyncio.CancelledError:
# verify tasks are cancelled and cancellation propagates as expected
#
class _AdapterWithCancelledTerminate:
async def terminate(self) -> None:
raise asyncio.CancelledError()
inst_terminate_cancelled = _PlatformThatNeedsTaskStopped()
inst_terminate_cancelled.adapter = _AdapterWithCancelledTerminate()
run_task_terminate_cancelled = asyncio.create_task(
_run_until_cancelled(inst_terminate_cancelled.run_task_cancelled)
)
wrapper_task_terminate_cancelled = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_terminate_cancelled.client_self_id] = PlatformTasks(
run=run_task_terminate_cancelled,
wrapper=wrapper_task_terminate_cancelled,
)
with pytest.raises(asyncio.CancelledError):
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_terminate_cancelled), timeout=1
)
assert inst_terminate_cancelled.run_task_cancelled.is_set()
assert run_task_terminate_cancelled.cancelled()
assert wrapper_task_terminate_cancelled.cancelled()
```
1. The new `pytest.raises` context requires `pytest` to be imported if it is not already in this file:
- Add `import pytest` near the top of `tests/unit/test_platform_manager.py`.
2. These scenarios are implemented inside the existing test body. If you want truly separate tests per your review comment, you can:
- Extract each scenario into its own `async def test_...` function, reusing the bodies of the four sections (happy-path, no-terminate, terminate-raises, terminate-cancelled).
- Keep the setup pattern (`PlatformManager`, `_PlatformThatNeedsTaskStopped`, and `_run_until_cancelled`) identical to the existing test for consistency.
3. The code assumes that `_terminate_inst_and_tasks` uses an `adapter` attribute on `inst`. If the actual attribute name differs, update `inst_*.adapter = ...` to match the real attribute used in `PlatformManager._terminate_inst_and_tasks`.
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的 Review。
Original comment in English
Hey - I've found 2 issues, and left some high level feedback:
- The new unit test reaches into
PlatformManager._platform_tasksdirectly; if possible, prefer exercising this behavior via the public API (e.g., the normal adapter registration/start/stop flow) so the test is less coupled to private internals.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new unit test reaches into `PlatformManager._platform_tasks` directly; if possible, prefer exercising this behavior via the public API (e.g., the normal adapter registration/start/stop flow) so the test is less coupled to private internals.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/manager.py" line_range="85" />
<code_context>
- )
- logger.error(traceback.format_exc())
- finally:
- await self._stop_platform_task(client_id)
+
+ # Stop the platform run/wrapper tasks before awaiting adapter-specific
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider guarding `_stop_platform_task` so adapter-specific `terminate()` still runs if stopping tasks fails.
With the previous `finally` block, `_stop_platform_task` always ran, even if `terminate()` failed. Now, any exception in `_stop_platform_task` will prevent `terminate()` from running. To preserve best-effort adapter cleanup while keeping the new ordering, you could wrap `_stop_platform_task` in a `try/except Exception` that logs the error and still calls `terminate()` afterward.
</issue_to_address>
### Comment 2
<location path="tests/unit/test_platform_manager.py" line_range="37-41" />
<code_context>
+ wrapper=wrapper_task,
+ )
+
+ await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)
+
+ assert inst.run_task_cancelled.is_set()
+ assert run_task.cancelled()
+ assert wrapper_task.cancelled()
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding separate tests for adapters without `terminate()` and for error paths in `terminate()`.
To better exercise `_terminate_inst_and_tasks`, consider adding:
1. A test where the adapter has no `terminate` attribute, asserting the function still completes and cancels both `run` and `wrapper` tasks.
2. Tests where `terminate()` raises:
- A generic `Exception`, verifying both tasks are cancelled despite the error.
- `asyncio.CancelledError`, verifying tasks are cancelled and the cancellation propagates as expected.
These would cover the main edge and error paths of the shutdown logic.
Suggested implementation:
```python
import asyncio
from types import SimpleNamespace
```
```python
manager = PlatformManager({"platform": [], "platform_settings": {}}, asyncio.Queue())
#
# Happy-path: adapter.terminate() exists and shutdown cancels both tasks
#
inst = _PlatformThatNeedsTaskStopped()
run_task = asyncio.create_task(_run_until_cancelled(inst.run_task_cancelled))
wrapper_task = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst.client_self_id] = PlatformTasks(
run=run_task,
wrapper=wrapper_task,
)
await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)
assert inst.run_task_cancelled.is_set()
assert run_task.cancelled()
assert wrapper_task.cancelled()
#
# Adapter without terminate(): _terminate_inst_and_tasks should still cancel tasks
#
inst_no_terminate = _PlatformThatNeedsTaskStopped()
inst_no_terminate.adapter = SimpleNamespace() # no terminate attribute
run_task_no_terminate = asyncio.create_task(
_run_until_cancelled(inst_no_terminate.run_task_cancelled)
)
wrapper_task_no_terminate = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_no_terminate.client_self_id] = PlatformTasks(
run=run_task_no_terminate,
wrapper=wrapper_task_no_terminate,
)
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_no_terminate), timeout=1
)
assert inst_no_terminate.run_task_cancelled.is_set()
assert run_task_no_terminate.cancelled()
assert wrapper_task_no_terminate.cancelled()
#
# Adapter.terminate() raises a generic Exception: tasks should still be cancelled
#
class _AdapterWithFailingTerminate:
async def terminate(self) -> None:
raise Exception("terminate failed")
inst_terminate_raises = _PlatformThatNeedsTaskStopped()
inst_terminate_raises.adapter = _AdapterWithFailingTerminate()
run_task_terminate_raises = asyncio.create_task(
_run_until_cancelled(inst_terminate_raises.run_task_cancelled)
)
wrapper_task_terminate_raises = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_terminate_raises.client_self_id] = PlatformTasks(
run=run_task_terminate_raises,
wrapper=wrapper_task_terminate_raises,
)
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_terminate_raises), timeout=1
)
assert inst_terminate_raises.run_task_cancelled.is_set()
assert run_task_terminate_raises.cancelled()
assert wrapper_task_terminate_raises.cancelled()
#
# Adapter.terminate() raises asyncio.CancelledError:
# verify tasks are cancelled and cancellation propagates as expected
#
class _AdapterWithCancelledTerminate:
async def terminate(self) -> None:
raise asyncio.CancelledError()
inst_terminate_cancelled = _PlatformThatNeedsTaskStopped()
inst_terminate_cancelled.adapter = _AdapterWithCancelledTerminate()
run_task_terminate_cancelled = asyncio.create_task(
_run_until_cancelled(inst_terminate_cancelled.run_task_cancelled)
)
wrapper_task_terminate_cancelled = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_terminate_cancelled.client_self_id] = PlatformTasks(
run=run_task_terminate_cancelled,
wrapper=wrapper_task_terminate_cancelled,
)
with pytest.raises(asyncio.CancelledError):
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_terminate_cancelled), timeout=1
)
assert inst_terminate_cancelled.run_task_cancelled.is_set()
assert run_task_terminate_cancelled.cancelled()
assert wrapper_task_terminate_cancelled.cancelled()
```
1. The new `pytest.raises` context requires `pytest` to be imported if it is not already in this file:
- Add `import pytest` near the top of `tests/unit/test_platform_manager.py`.
2. These scenarios are implemented inside the existing test body. If you want truly separate tests per your review comment, you can:
- Extract each scenario into its own `async def test_...` function, reusing the bodies of the four sections (happy-path, no-terminate, terminate-raises, terminate-cancelled).
- Keep the setup pattern (`PlatformManager`, `_PlatformThatNeedsTaskStopped`, and `_run_until_cancelled`) identical to the existing test for consistency.
3. The code assumes that `_terminate_inst_and_tasks` uses an `adapter` attribute on `inst`. If the actual attribute name differs, update `inst_*.adapter = ...` to match the real attribute used in `PlatformManager._terminate_inst_and_tasks`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| raise | ||
| except Exception as e: | ||
| logger.error( | ||
| "终止平台适配器失败: client_id=%s, error=%s", |
There was a problem hiding this comment.
suggestion (bug_risk): 建议为 _stop_platform_task 增加保护,以便在停止任务失败时,适配器特定的 terminate() 仍然可以执行。
在之前的 finally 代码块中,即使 terminate() 失败,_stop_platform_task 也总是会被执行。现在,只要 _stop_platform_task 中抛出异常,就会阻止后续的 terminate() 执行。为了在保持新的调用顺序的同时,仍尽可能完成适配器的清理,你可以考虑把 _stop_platform_task 包装在一个 try/except Exception 中,记录错误日志后仍然调用 terminate()。
Original comment in English
suggestion (bug_risk): Consider guarding _stop_platform_task so adapter-specific terminate() still runs if stopping tasks fails.
With the previous finally block, _stop_platform_task always ran, even if terminate() failed. Now, any exception in _stop_platform_task will prevent terminate() from running. To preserve best-effort adapter cleanup while keeping the new ordering, you could wrap _stop_platform_task in a try/except Exception that logs the error and still calls terminate() afterward.
| await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1) | ||
|
|
||
| assert inst.run_task_cancelled.is_set() | ||
| assert run_task.cancelled() | ||
| assert wrapper_task.cancelled() |
There was a problem hiding this comment.
suggestion (testing): 建议为不带 terminate() 的适配器,以及 terminate() 出错路径分别增加测试用例。
为了更好地覆盖 _terminate_inst_and_tasks,可以考虑增加:
- 一个适配器不包含
terminate属性的测试,断言该函数仍能正常结束,并取消run和wrapper两个任务。 - 若干
terminate()抛出异常时的测试:- 抛出通用
Exception,验证即使出现错误,两个任务仍然被取消; - 抛出
asyncio.CancelledError,验证任务被取消且取消状态按预期传播。
- 抛出通用
这些测试可以覆盖关闭逻辑的主要边界与错误路径。
建议的实现:
import asyncio
from types import SimpleNamespace manager = PlatformManager({"platform": [], "platform_settings": {}}, asyncio.Queue())
#
# Happy-path: adapter.terminate() exists and shutdown cancels both tasks
#
inst = _PlatformThatNeedsTaskStopped()
run_task = asyncio.create_task(_run_until_cancelled(inst.run_task_cancelled))
wrapper_task = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst.client_self_id] = PlatformTasks(
run=run_task,
wrapper=wrapper_task,
)
await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)
assert inst.run_task_cancelled.is_set()
assert run_task.cancelled()
assert wrapper_task.cancelled()
#
# Adapter without terminate(): _terminate_inst_and_tasks should still cancel tasks
#
inst_no_terminate = _PlatformThatNeedsTaskStopped()
inst_no_terminate.adapter = SimpleNamespace() # no terminate attribute
run_task_no_terminate = asyncio.create_task(
_run_until_cancelled(inst_no_terminate.run_task_cancelled)
)
wrapper_task_no_terminate = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_no_terminate.client_self_id] = PlatformTasks(
run=run_task_no_terminate,
wrapper=wrapper_task_no_terminate,
)
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_no_terminate), timeout=1
)
assert inst_no_terminate.run_task_cancelled.is_set()
assert run_task_no_terminate.cancelled()
assert wrapper_task_no_terminate.cancelled()
#
# Adapter.terminate() raises a generic Exception: tasks should still be cancelled
#
class _AdapterWithFailingTerminate:
async def terminate(self) -> None:
raise Exception("terminate failed")
inst_terminate_raises = _PlatformThatNeedsTaskStopped()
inst_terminate_raises.adapter = _AdapterWithFailingTerminate()
run_task_terminate_raises = asyncio.create_task(
_run_until_cancelled(inst_terminate_raises.run_task_cancelled)
)
wrapper_task_terminate_raises = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_terminate_raises.client_self_id] = PlatformTasks(
run=run_task_terminate_raises,
wrapper=wrapper_task_terminate_raises,
)
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_terminate_raises), timeout=1
)
assert inst_terminate_raises.run_task_cancelled.is_set()
assert run_task_terminate_raises.cancelled()
assert wrapper_task_terminate_raises.cancelled()
#
# Adapter.terminate() raises asyncio.CancelledError:
# verify tasks are cancelled and cancellation propagates as expected
#
class _AdapterWithCancelledTerminate:
async def terminate(self) -> None:
raise asyncio.CancelledError()
inst_terminate_cancelled = _PlatformThatNeedsTaskStopped()
inst_terminate_cancelled.adapter = _AdapterWithCancelledTerminate()
run_task_terminate_cancelled = asyncio.create_task(
_run_until_cancelled(inst_terminate_cancelled.run_task_cancelled)
)
wrapper_task_terminate_cancelled = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_terminate_cancelled.client_self_id] = PlatformTasks(
run=run_task_terminate_cancelled,
wrapper=wrapper_task_terminate_cancelled,
)
with pytest.raises(asyncio.CancelledError):
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_terminate_cancelled), timeout=1
)
assert inst_terminate_cancelled.run_task_cancelled.is_set()
assert run_task_terminate_cancelled.cancelled()
assert wrapper_task_terminate_cancelled.cancelled()- 新增的
pytest.raises上下文需要在文件中导入pytest(如果还未导入):- 在
tests/unit/test_platform_manager.py顶部添加import pytest。
- 在
- 目前这些场景都实现在同一个测试函数体内。如果你希望像评审建议那样拆成真正独立的测试,可以:
- 将每个场景抽取成单独的
async def test_...函数,复用这四个部分的测试主体(happy-path、no-terminate、terminate-raises、terminate-cancelled)。 - 保持与现有测试一致的初始化模式(
PlatformManager、_PlatformThatNeedsTaskStopped和_run_until_cancelled)。
- 将每个场景抽取成单独的
- 这段代码假设
_terminate_inst_and_tasks在inst上使用的是adapter属性。如果真实代码使用的属性名不同,请将inst_*.adapter = ...更新为PlatformManager._terminate_inst_and_tasks实际使用的属性名。
Original comment in English
suggestion (testing): Consider adding separate tests for adapters without terminate() and for error paths in terminate().
To better exercise _terminate_inst_and_tasks, consider adding:
- A test where the adapter has no
terminateattribute, asserting the function still completes and cancels bothrunandwrappertasks. - Tests where
terminate()raises:- A generic
Exception, verifying both tasks are cancelled despite the error. asyncio.CancelledError, verifying tasks are cancelled and the cancellation propagates as expected.
- A generic
These would cover the main edge and error paths of the shutdown logic.
Suggested implementation:
import asyncio
from types import SimpleNamespace manager = PlatformManager({"platform": [], "platform_settings": {}}, asyncio.Queue())
#
# Happy-path: adapter.terminate() exists and shutdown cancels both tasks
#
inst = _PlatformThatNeedsTaskStopped()
run_task = asyncio.create_task(_run_until_cancelled(inst.run_task_cancelled))
wrapper_task = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst.client_self_id] = PlatformTasks(
run=run_task,
wrapper=wrapper_task,
)
await asyncio.wait_for(manager._terminate_inst_and_tasks(inst), timeout=1)
assert inst.run_task_cancelled.is_set()
assert run_task.cancelled()
assert wrapper_task.cancelled()
#
# Adapter without terminate(): _terminate_inst_and_tasks should still cancel tasks
#
inst_no_terminate = _PlatformThatNeedsTaskStopped()
inst_no_terminate.adapter = SimpleNamespace() # no terminate attribute
run_task_no_terminate = asyncio.create_task(
_run_until_cancelled(inst_no_terminate.run_task_cancelled)
)
wrapper_task_no_terminate = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_no_terminate.client_self_id] = PlatformTasks(
run=run_task_no_terminate,
wrapper=wrapper_task_no_terminate,
)
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_no_terminate), timeout=1
)
assert inst_no_terminate.run_task_cancelled.is_set()
assert run_task_no_terminate.cancelled()
assert wrapper_task_no_terminate.cancelled()
#
# Adapter.terminate() raises a generic Exception: tasks should still be cancelled
#
class _AdapterWithFailingTerminate:
async def terminate(self) -> None:
raise Exception("terminate failed")
inst_terminate_raises = _PlatformThatNeedsTaskStopped()
inst_terminate_raises.adapter = _AdapterWithFailingTerminate()
run_task_terminate_raises = asyncio.create_task(
_run_until_cancelled(inst_terminate_raises.run_task_cancelled)
)
wrapper_task_terminate_raises = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_terminate_raises.client_self_id] = PlatformTasks(
run=run_task_terminate_raises,
wrapper=wrapper_task_terminate_raises,
)
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_terminate_raises), timeout=1
)
assert inst_terminate_raises.run_task_cancelled.is_set()
assert run_task_terminate_raises.cancelled()
assert wrapper_task_terminate_raises.cancelled()
#
# Adapter.terminate() raises asyncio.CancelledError:
# verify tasks are cancelled and cancellation propagates as expected
#
class _AdapterWithCancelledTerminate:
async def terminate(self) -> None:
raise asyncio.CancelledError()
inst_terminate_cancelled = _PlatformThatNeedsTaskStopped()
inst_terminate_cancelled.adapter = _AdapterWithCancelledTerminate()
run_task_terminate_cancelled = asyncio.create_task(
_run_until_cancelled(inst_terminate_cancelled.run_task_cancelled)
)
wrapper_task_terminate_cancelled = asyncio.create_task(asyncio.sleep(3600))
manager._platform_tasks[inst_terminate_cancelled.client_self_id] = PlatformTasks(
run=run_task_terminate_cancelled,
wrapper=wrapper_task_terminate_cancelled,
)
with pytest.raises(asyncio.CancelledError):
await asyncio.wait_for(
manager._terminate_inst_and_tasks(inst_terminate_cancelled), timeout=1
)
assert inst_terminate_cancelled.run_task_cancelled.is_set()
assert run_task_terminate_cancelled.cancelled()
assert wrapper_task_terminate_cancelled.cancelled()- The new
pytest.raisescontext requirespytestto be imported if it is not already in this file:- Add
import pytestnear the top oftests/unit/test_platform_manager.py.
- Add
- These scenarios are implemented inside the existing test body. If you want truly separate tests per your review comment, you can:
- Extract each scenario into its own
async def test_...function, reusing the bodies of the four sections (happy-path, no-terminate, terminate-raises, terminate-cancelled). - Keep the setup pattern (
PlatformManager,_PlatformThatNeedsTaskStopped, and_run_until_cancelled) identical to the existing test for consistency.
- Extract each scenario into its own
- The code assumes that
_terminate_inst_and_tasksuses anadapterattribute oninst. If the actual attribute name differs, updateinst_*.adapter = ...to match the real attribute used inPlatformManager._terminate_inst_and_tasks.
Fixes #6100.
Modifications / 改动点
cancel a platform adapter's run/wrapper tasks before awaiting its
terminate()hookdocument why this order matters for websocket-based adapters such as
qq_officialadd a regression test covering adapters whose shutdown depends on their background task being cancelled first
This is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.Summary by Sourcery
确保在调用适配器特定的终止钩子之前,平台适配器的关闭过程会先停止后台任务,从而避免残留的 WebSocket 客户端。
Bug 修复:
terminate()之前取消适配器的运行任务和包装任务,防止基于 WebSocket 的平台适配器在关闭期间继续接收事件。测试:
Original summary in English
Summary by Sourcery
Ensure platform adapter shutdown stops background tasks before invoking adapter-specific terminate hooks to avoid lingering websocket clients.
Bug Fixes:
Tests: