From 69a7527664d6ddb832dbcfe48b311542f3ae843f Mon Sep 17 00:00:00 2001 From: evgeny Date: Fri, 13 Mar 2026 12:00:22 +0000 Subject: [PATCH 1/2] [ECO-5698] fix: handle normal WebSocket close frames and improve reconnection logic - Added local WebSocket proxy for testing (`WsProxy`) and corresponding tests for immediate reconnection on normal close. - Fixed missing reconnection on server-sent normal WebSocket close frames in `WebSocketTransport`. - Adjusted idle timer handling to avoid accidental reuse. --- ably/transport/websockettransport.py | 13 ++- test/ably/realtime/realtimeconnection_test.py | 104 ++++++++++++++++++ 2 files changed, 114 insertions(+), 3 deletions(-) diff --git a/ably/transport/websockettransport.py b/ably/transport/websockettransport.py index be13d096..ad4f2856 100644 --- a/ably/transport/websockettransport.py +++ b/ably/transport/websockettransport.py @@ -80,7 +80,8 @@ def __init__(self, connection_manager: ConnectionManager, host: str, params: dic def connect(self): headers = HttpUtils.default_headers() query_params = urllib.parse.urlencode(self.params) - ws_url = (f'wss://{self.host}?{query_params}') + scheme = 'wss' if self.options.tls else 'ws' + ws_url = f'{scheme}://{self.host}?{query_params}' log.info(f'connect(): attempting to connect to {ws_url}') self.ws_connect_task = asyncio.create_task(self.ws_connect(ws_url, headers)) self.ws_connect_task.add_done_callback(self.on_ws_connect_done) @@ -124,6 +125,11 @@ async def _handle_websocket_connection(self, ws_url, websocket): if not self.is_disposed: await self.dispose() self.connection_manager.deactivate_transport(err) + else: + # Read loop exited normally (e.g., server sent normal WS close frame) + if not self.is_disposed: + await self.dispose() + self.connection_manager.deactivate_transport() async def on_protocol_message(self, msg): self.on_activity() @@ -284,8 +290,9 @@ async def send(self, message: dict): await self.websocket.send(raw_msg) def set_idle_timer(self, timeout: float): - if not self.idle_timer: - self.idle_timer = Timer(timeout, self.on_idle_timer_expire) + if self.idle_timer: + self.idle_timer.cancel() + self.idle_timer = Timer(timeout, self.on_idle_timer_expire) async def on_idle_timer_expire(self): self.idle_timer = None diff --git a/test/ably/realtime/realtimeconnection_test.py b/test/ably/realtime/realtimeconnection_test.py index f1eb9003..2593eb3e 100644 --- a/test/ably/realtime/realtimeconnection_test.py +++ b/test/ably/realtime/realtimeconnection_test.py @@ -1,6 +1,14 @@ import asyncio import pytest +from websockets import connect as _ws_connect + +try: + # websockets 15+ preferred import + from websockets.asyncio.server import serve as ws_serve +except ImportError: + # websockets 14 and earlier fallback + from websockets.server import serve as ws_serve from ably.realtime.connection import ConnectionEvent, ConnectionState from ably.transport.defaults import Defaults @@ -10,6 +18,68 @@ from test.ably.utils import BaseAsyncTestCase +async def _relay(src, dst): + try: + async for msg in src: + await dst.send(msg) + except Exception: + pass + + +class WsProxy: + """Local WS proxy that forwards to real Ably and lets tests trigger a normal close.""" + + def __init__(self, target_host: str): + self.target_host = target_host + self.server = None + self.port: int | None = None + self._close_event: asyncio.Event | None = None + + async def _handler(self, client_ws): + # Create a fresh event for this connection; signal to drop the connection cleanly + self._close_event = asyncio.Event() + path = client_ws.request.path # e.g. "/?key=...&format=json" + target_url = f"wss://{self.target_host}{path}" + try: + async with _ws_connect(target_url, ping_interval=None) as server_ws: + c2s = asyncio.create_task(_relay(client_ws, server_ws)) + s2c = asyncio.create_task(_relay(server_ws, client_ws)) + close_task = asyncio.create_task(self._close_event.wait()) + try: + await asyncio.wait([c2s, s2c, close_task], return_when=asyncio.FIRST_COMPLETED) + finally: + c2s.cancel() + s2c.cancel() + close_task.cancel() + except Exception: + pass + # After _handler returns the websockets server sends a normal close frame (1000) + + async def close_active_connection(self): + """Trigger a normal WS close (code 1000) on the currently active client connection. + + Signals the handler to exit; the websockets server framework then sends the + close frame automatically when the handler coroutine returns. + """ + if self._close_event: + self._close_event.set() + + @property + def endpoint(self) -> str: + """Endpoint string to pass to AblyRealtime (combine with tls=False).""" + return f"127.0.0.1:{self.port}" + + async def __aenter__(self): + self.server = await ws_serve(self._handler, "127.0.0.1", 0, ping_interval=None) + self.port = self.server.sockets[0].getsockname()[1] + return self + + async def __aexit__(self, *args): + if self.server: + self.server.close() + await self.server.wait_closed() + + class TestRealtimeConnection(BaseAsyncTestCase): @pytest.fixture(autouse=True) async def setup(self): @@ -469,3 +539,37 @@ async def test_queue_messages_defaults_to_true(self): # TO3g: queueMessages defaults to true assert ably.options.queue_messages is True assert ably.connection.connection_manager.options.queue_messages is True + + async def test_normal_ws_close_triggers_immediate_reconnection(self): + """Server normal WS close (code 1000) must trigger immediate reconnection. + + Regression test: ConnectionClosedOK was silently swallowed and deactivate_transport + was never called, leaving the client disconnected until the idle timer fired. + """ + async with WsProxy(self.test_vars["host"]) as proxy: + ably = await TestApp.get_ably_realtime( + disconnected_retry_timeout=500_000, + suspended_retry_timeout=500_000, + tls=False, + endpoint=proxy.endpoint, + ) + + try: + await asyncio.wait_for( + ably.connection.once_async(ConnectionState.CONNECTED), timeout=10 + ) + + # Simulate server sending a normal WS close frame + await proxy.close_active_connection() + + # Must go CONNECTING quickly — not after the 25 s idle timer + await asyncio.wait_for( + ably.connection.once_async(ConnectionState.CONNECTING), timeout=1 + ) + + # Must reconnect immediately — not after the 500 s retry timer + await asyncio.wait_for( + ably.connection.once_async(ConnectionState.CONNECTED), timeout=10 + ) + finally: + await ably.close() From 7cc7742604e40dca1db140a009e93a0a4c15a4e3 Mon Sep 17 00:00:00 2001 From: evgeny Date: Fri, 13 Mar 2026 13:09:37 +0000 Subject: [PATCH 2/2] fix: now first append return full message see: https://ably.atlassian.net/wiki/x/QQDjIQE --- .../realtime/realtimechannelmutablemessages_test.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/test/ably/realtime/realtimechannelmutablemessages_test.py b/test/ably/realtime/realtimechannelmutablemessages_test.py index 047ea3b6..69d2162e 100644 --- a/test/ably/realtime/realtimechannelmutablemessages_test.py +++ b/test/ably/realtime/realtimechannelmutablemessages_test.py @@ -236,7 +236,8 @@ async def test_append_message_with_string_data(self): def on_message(message): messages_received.append(message) - append_received.finish() + if len(messages_received) == 2: + append_received.finish() await channel.subscribe(on_message) @@ -254,15 +255,21 @@ def on_message(message): channel, serial, MessageAction.MESSAGE_UPDATE ) + second_append_result = await channel.append_message(append_message, append_operation) + await append_received.wait() - assert messages_received[0].data == ' appended data' - assert messages_received[0].action == MessageAction.MESSAGE_APPEND + assert messages_received[0].data == 'Initial data appended data' + assert messages_received[0].action == MessageAction.MESSAGE_UPDATE assert appended_message.data == 'Initial data appended data' assert appended_message.version.serial == append_result.version_serial assert appended_message.version.description == 'Appended to message' assert appended_message.serial == serial + assert messages_received[1].data == ' appended data' + assert messages_received[1].action == MessageAction.MESSAGE_APPEND + assert messages_received[1].version.serial == second_append_result.version_serial + async def wait_until_message_with_action_appears(self, channel, serial, action): message: Message | None = None async def check_message_action():