From 8cee3a9a669395d971de06ed13c059428949ba67 Mon Sep 17 00:00:00 2001 From: stefanrammo Date: Thu, 19 Mar 2026 14:31:39 +0200 Subject: [PATCH] Implement find() wait and RECONNECT structure event find() now waits for the target app to appear instead of failing immediately. subscribeToStructure fires RECONNECT (2) when an app reappears after being removed, distinguishing restarts from first-time appearances. Changes: - find() waits indefinitely by default; { timeout: N } limits the wait; { timeout: 0 } preserves the old immediate-fail behavior - find() works without prior root() call (triggers connection internally) - close() is terminal: sets isClosed flag, rejects pending find() waiters, prevents reconnection after close - RECONNECT constant (2) added alongside ADD (1) and REMOVE (0) - Per-app lifecycle via announceApp/unannounceApp: each app is tracked independently, fixing the bug where one sibling disconnecting lost all sibling connections - Subscription keepalive via inactivityResendInterval field (client requests server resend values/events after 120s of no changes, matching C++ client ServicesProtocol::InactivityResendIntervalSec) - Value dedup via lastServerTimestamp and event dedup via recentEventIds to handle server replay after reconnection - Exponential backoff with jitter for reconnection (1s to 30s max) - Stall detection: force reconnect after 150s of server silence with active subscriptions (must exceed 120s keepalive interval) - Direct mode sibling discovery uses server-pushed eStructureChangeResponse instead of client-side polling - CDP_FORCE_DIRECT_MODE=1 env var to force direct mode on proxy-capable servers (for testing) - Cache key fix: use join('.') instead of toString() for correct invalidateApp prefix matching on app structure changes - Remove unused serviceId storage on proxy connections - Fix instanceId serialization: pass through as-is instead of defaulting falsy values to 0 - appAddress() helper for DRY server address construction - README updated with Structure Events section and find() options - Version bump to 3.0.0 (breaking: find() default changed from immediate fail to indefinite wait) CDP-6069 --- README.rst | 79 +++- index.js | 470 ++++++++++++++++++++---- package.json | 2 +- test/duplicate-values-reconnect.test.js | 15 +- test/find-and-structure-events.test.js | 68 ++++ 5 files changed, 551 insertions(+), 83 deletions(-) create mode 100644 test/find-and-structure-events.test.js diff --git a/README.rst b/README.rst index 8a302d8..efb93c5 100644 --- a/README.rst +++ b/README.rst @@ -38,6 +38,8 @@ Example root.subscribeToStructure((name, change) => { if (change === studio.api.structure.ADD) subscribeToApp(name); + if (change === studio.api.structure.RECONNECT) + console.log(`${name} restarted, subscriptions intact`); }); }).catch(err => console.error("Connection failed:", err)); @@ -61,6 +63,41 @@ Benefits - Simplified firewall configuration - only one port needs to be opened - SSH port forwarding - forward a single port to access entire CDP system +Structure Events +---------------- + +On the root node, ``subscribeToStructure`` tracks application lifecycle with three event types: + +- ``studio.api.structure.ADD`` (1) — An application appeared for the first time +- ``studio.api.structure.REMOVE`` (0) — An application went offline +- ``studio.api.structure.RECONNECT`` (2) — An application restarted (was seen before, went offline, came back) + +On other nodes, ADD and REMOVE fire when children are added or removed at runtime. +RECONNECT only fires at the root level. + +When an app restarts, the client automatically restores value and event subscriptions, +so user code does not need to re-subscribe. RECONNECT is informational — use it for +logging or UI updates. + + .. code:: javascript + + client.root().then(root => { + root.subscribeToStructure((appName, change) => { + if (change === studio.api.structure.ADD) { + console.log(`New app online: ${appName}`); + client.find(appName + '.CPULoad').then(node => { + node.subscribeToValues(v => console.log(`[${appName}] CPULoad: ${v}`)); + }).catch(err => console.error(`Failed to find ${appName}.CPULoad:`, err)); + } + if (change === studio.api.structure.REMOVE) { + console.log(`App offline: ${appName}`); + } + if (change === studio.api.structure.RECONNECT) { + console.log(`App restarted: ${appName}, subscriptions intact`); + } + }); + }).catch(err => console.error("Connection failed:", err)); + API --- @@ -325,31 +362,46 @@ client.root() // use the system INode object to access connected structure. }); -client.find(path) -^^^^^^^^^^^^^^^^^ +client.find(path, options) +^^^^^^^^^^^^^^^^^^^^^^^^^ - Arguments - path - Path of the object to look for. + path - Dot-separated path to target node (e.g. ``'App2.CPULoad'``). + + options - Optional. Without this, find() waits indefinitely for the app to appear. + ``{ timeout: milliseconds }`` to limit wait time. + ``{ timeout: 0 }`` to fail immediately if the app is not available. - Returns Promise containing requested INode object when fulfilled. -- Restriction +- Behavior - The requested node must reside in the application client was connected to. + When no timeout option is provided, waits indefinitely for the target application to appear. + If the application is already available, resolves immediately. No prior ``root()`` call is needed — ``find()`` triggers + the connection internally. -- Usage +- Examples - The provided path must contain dot separated path to target node. **Root node is not considered part of the path.** + .. code:: javascript -- Example + // Waits indefinitely for App2 to appear + client.find("App2.CPULoad").then(function (load) { + load.subscribeToValues(function (value) { + console.log("CPULoad:", value); + }); + }); - .. code:: javascript + // Wait up to 5 seconds + client.find("App2.CPULoad", { timeout: 5000 }).catch(function (err) { + console.log(err.message); // "App2 not found within 5000ms" + }); - client.find("MyApp.CPULoad").then(function (load) { - // use the load object referring to CPULoad in MyApp + // Fail immediately if not available (old behavior) + client.find("App2.CPULoad", { timeout: 0 }).catch(function (err) { + console.log("Not available right now"); }); client.close() @@ -604,8 +656,9 @@ node.subscribeToStructure(structureConsumer) - Usage - Subscribe to structure changes on this node. Each time child is added or removed from current node - structureConsumer function is called with the name of the node and change argument where ADD == 1 and REMOVE == 0. + Subscribe to structure changes on this node. Each time a child is added or removed, + structureConsumer is called with the child name and change (ADD == 1, REMOVE == 0). + On the root node, RECONNECT (2) fires when a previously-seen application restarts. node.unsubscribeFromStructure(structureConsumer) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/index.js b/index.js index 0cbc838..f7cc9cc 100644 --- a/index.js +++ b/index.js @@ -242,7 +242,7 @@ studio.protocol = (function(ProtoBuf) { } // Minimum compat version required for proxy protocol support - var PROXY_MIN_COMPAT_VERSION = 4; + const PROXY_MIN_COMPAT_VERSION = 4; obj.PROXY_MIN_COMPAT_VERSION = PROXY_MIN_COMPAT_VERSION; // Create encoded ServicesRequest container bytes for proxy protocol @@ -398,8 +398,8 @@ studio.protocol = (function(ProtoBuf) { resolve(new ContainerHandler(onContainer, onError, metadata)); } }) - .catch(function(){ - console.log("Application acceptance denied.") + .catch(function(err){ + console.error("Application acceptance failed:", err); resolve(this); }.bind(this)); }.bind(this)); @@ -455,9 +455,17 @@ studio.internal = (function(proto) { obj.structure = { REMOVE: 0, - ADD: 1 + ADD: 1, + RECONNECT: 2 }; + const STRUCTURE_REQUEST_TIMEOUT_MS = 30000; + const MAX_RECONNECT_DELAY_MS = 30000; + const INITIAL_RECONNECT_DELAY_MS = 1000; + const INACTIVITY_RESEND_INTERVAL_S = 120; + const EVENT_HISTORY_WAIT_NS = 120e9; // 120s — accept old events during this window after subscribe + const EVENT_DEDUP_WINDOW_NS = 30e9; // 30s — trim entries older than this after history wait + // Helper to remove first matching item from array (shared by AppNode and SystemNode) function removeFirst(array, predicate) { var idx = array.findIndex(predicate); @@ -479,6 +487,8 @@ studio.internal = (function(proto) { var lastInfo = null; //when we get this, if there are any child requests we need to fetch child fetch too var valid = true; var hasActiveValueSubscription = false; // track if we've sent a getter request to server + var lastServerTimestamp = null; // for value dedup after reconnect + var lastEventTimestamp = 0; // track last received event timestamp for reconnect resume this.path = function() { var path = ""; @@ -514,6 +524,7 @@ studio.internal = (function(proto) { givenPromises.forEach(function(promiseHandlers, apiNode) { promiseHandlers.forEach(function(promiseHandler) { try { + clearTimeout(promiseHandler.timer); promiseHandler.reject(apiNode); } catch (e) { /* ignore */ } }); @@ -522,7 +533,7 @@ studio.internal = (function(proto) { }; this.hasSubscriptions = function() { - return valueSubscriptions.length > 0; + return valueSubscriptions.length > 0 || eventSubscriptions.length > 0; }; this.info = function() { @@ -552,9 +563,17 @@ studio.internal = (function(proto) { parent = nodeParent; lastInfo = protoInfo; id = protoInfo.nodeId; + // Keep lastServerTimestamp across reconnect — filters the server's + // cached last-known-value replay (which has the original timestamp). + // Per-listener recentEvents preserved across reconnect — the server + // replays from lastEventTimestamp (inclusive), and preserving each + // listener's cache prevents boundary events from being duplicated. this.async._makeGetterRequest(); - for (var i = 0; i < eventSubscriptions.length; i++) - app.makeEventRequest(id, eventSubscriptions[i][1], false); + for (var i = 0; i < eventSubscriptions.length; i++) { + // Resume from last received event timestamp (not original startingFrom) to avoid duplicates + var resumeFrom = lastEventTimestamp > 0 ? lastEventTimestamp : eventSubscriptions[i].startingFrom; + app.makeEventRequest(id, resumeFrom, false); + } }; this.add = function(node) { @@ -579,7 +598,6 @@ studio.internal = (function(proto) { this.done = function() { structureFetched = true; valid = true; // Re-validate node when structure is successfully fetched - //Call process node requests from childRequests givenPromises.forEach(function (promiseHandlers, apiNode) { promiseHandlers.forEach(function(promiseHandler) { if (apiNode.isValid()) { @@ -598,6 +616,16 @@ studio.internal = (function(proto) { }; this.receiveValue = function (nodeValue, nodeTimestamp) { + // Skip values with older timestamps (filters reconnect replay where + // the server resends the last known value with its original timestamp). + if (nodeTimestamp !== undefined) { + var ts = Number(nodeTimestamp); + if (ts > 0) { + if (lastServerTimestamp !== null && ts < lastServerTimestamp) + return; + lastServerTimestamp = ts; + } + } lastValue = nodeValue; for (var i = 0; i < valueSubscriptions.length; i++) { valueSubscriptions[i][0](nodeValue, nodeTimestamp); @@ -605,8 +633,43 @@ studio.internal = (function(proto) { }; this.receiveEvent = function (event) { + // Per-listener event dedup — each subscriber has independent dedup state. + var hasId = event.id !== undefined; + var ts = event.timestamp !== undefined ? Number(event.timestamp) : undefined; + var eventKey = hasId ? String(event.id) : null; + var nowNs = hasId ? Date.now() * 1e6 : 0; + if (ts !== undefined && ts > lastEventTimestamp) lastEventTimestamp = ts; for (var i = 0; i < eventSubscriptions.length; i++) { - eventSubscriptions[i][0](event); + var sub = eventSubscriptions[i]; + if (hasId && ts !== undefined) { + var isPastHistoryWait = nowNs > sub.dedupStartNs + EVENT_HISTORY_WAIT_NS; + // Reject events older than lowest tracked timestamp after history wait. + // When Map is empty (lowestTs 0), any event passes (nothing to compare against). + var lowestTs = sub.recentEvents.size > 0 ? sub.lowestTs : 0; + if (ts < lowestTs && isPastHistoryWait) continue; + // Exact (eventId, timestamp) duplicate check using composite key. + // Composite key allows multiple timestamps per eventId (recurring alarms). + var compositeKey = eventKey + ':' + event.timestamp; + if (sub.recentEvents.has(compositeKey)) continue; + sub.recentEvents.set(compositeKey, ts); + if (sub.recentEvents.size === 1 || ts < sub.lowestTs) sub.lowestTs = ts; + // Trim expired entries after history wait. At high event rates, the adaptive + // threshold (trimAt) amortizes the O(n) scan. At low rates (below threshold), + // check if the oldest entry has expired to keep the dedup window accurate. + if (isPastHistoryWait) { + var cutoff = ts - EVENT_DEDUP_WINDOW_NS; + if (sub.recentEvents.size > sub.trimAt + || (sub.recentEvents.size <= sub.trimAt && sub.lowestTs > 0 && sub.lowestTs < cutoff)) { + sub.lowestTs = 0; + sub.recentEvents.forEach(function(storedTs, key) { + if (storedTs < cutoff) sub.recentEvents.delete(key); + else if (sub.lowestTs === 0 || storedTs < sub.lowestTs) sub.lowestTs = storedTs; + }); + sub.trimAt = Math.max(100, sub.recentEvents.size * 2); + } + } + } + sub.callback(event); } }; @@ -618,7 +681,23 @@ studio.internal = (function(proto) { if (!givenPromises.has(apiNode)) { givenPromises.set(apiNode, []); } - givenPromises.get(apiNode).push({resolve: resolve, reject: reject}); + var settled = false; + var entry = { + resolve: function(v) { if (!settled) { settled = true; clearTimeout(entry.timer); resolve(v); } }, + reject: function(v) { if (!settled) { settled = true; clearTimeout(entry.timer); reject(v); } }, + timer: setTimeout(function() { + if (!settled) { + settled = true; + // Remove from givenPromises to prevent double-fire + var arr = givenPromises.get(apiNode); + var idx = arr.indexOf(entry); + if (idx >= 0) arr.splice(idx, 1); + if (arr.length === 0) givenPromises.delete(apiNode); + reject(new Error("Structure request timed out after " + STRUCTURE_REQUEST_TIMEOUT_MS + "ms")); + } + }, STRUCTURE_REQUEST_TIMEOUT_MS) + }; + givenPromises.get(apiNode).push(entry); } else { if (apiNode.isValid()) { resolve(apiNode); @@ -652,12 +731,31 @@ studio.internal = (function(proto) { }; this.async.subscribeToEvents = function(eventConsumer, startingFrom) { - eventSubscriptions.push([eventConsumer, startingFrom]); + var existing = null; + for (var si = 0; si < eventSubscriptions.length; si++) { + if (eventSubscriptions[si].callback === eventConsumer) { existing = eventSubscriptions[si]; break; } + } + if (existing) { + existing.startingFrom = startingFrom; + existing.recentEvents = new Map(); + existing.lowestTs = 0; + existing.trimAt = 100; + existing.dedupStartNs = Date.now() * 1e6; + } else { + eventSubscriptions.push({ + callback: eventConsumer, + startingFrom: startingFrom, + recentEvents: new Map(), + lowestTs: 0, + trimAt: 100, + dedupStartNs: Date.now() * 1e6 + }); + } app.makeEventRequest(id, startingFrom, false); }; this.async.unsubscribeFromEvents = function(eventConsumer) { - removeFirst(eventSubscriptions, function(s) { return s[0] === eventConsumer; }); + removeFirst(eventSubscriptions, function(s) { return s.callback === eventConsumer; }); if (eventSubscriptions.length === 0) app.makeEventRequest(id, 0, true); }; @@ -699,9 +797,13 @@ studio.internal = (function(proto) { var connected = false; var connecting = false; var connectGeneration = 0; + var isClosed = false; var structureSubscriptions = []; var announcedApps = new Set(); + var everSeenApps = new Set(); + var pendingFindWaiters = []; // for find() waiting on late apps var pendingFetches = []; + var connectionLocalApps = new Map(); // Maps AppConnection → local app name (direct mode) var this_ = this; function isApplicationNode(node) { @@ -709,6 +811,10 @@ studio.internal = (function(proto) { return info && info.isLocal && info.nodeType === proto.CDPNodeType.CDP_APPLICATION; } + function appAddress(info) { + return info.serverAddr + ':' + info.serverPort; + } + function notifyStructure(name, change) { // Notify internal cache invalidation callback (constructor param) if (onStructureChange) { @@ -728,12 +834,85 @@ studio.internal = (function(proto) { }); } + // Wake up find() callers waiting for a specific app to appear + function notifyFindWaiters(appName) { + pendingFindWaiters = pendingFindWaiters.filter(function(waiter) { + if (waiter.appName === appName) { + waiter.resolve(); + return false; // remove from list + } + return true; + }); + } + + // Check if an app is currently connected and announced + this.isAppAvailable = function(appName) { + return announcedApps.has(appName); + }; + + // Check if an app was ever seen (used to distinguish "not yet discovered" from "disconnected") + this.wasAppSeen = function(appName) { + return everSeenApps.has(appName); + }; + + // Check if client is in direct mode (no proxy protocol) + this.isDirectMode = function() { + return appConnections.length > 0 && !appConnections[0].supportsProxyProtocol(); + }; + + // Request a structure refresh from the primary connection (direct mode only). + // In proxy mode, discovery is push-based via ServicesNotification so this is a no-op. + function requestStructureRefresh() { + if (appConnections[0] && !appConnections[0].supportsProxyProtocol()) { + appConnections[0].makeStructureRequest(0); + } + } + + // Register a waiter for a specific app name, with timeout + this.waitForApp = function(appName, timeoutMs) { + // Check if already available + if (announcedApps.has(appName)) { + return Promise.resolve(); + } + + // In direct mode, ask the server now to trigger immediate discovery. + // After this, the server will push eStructureChangeResponse (id 0) + // when siblings start/stop. + requestStructureRefresh(); + + return new Promise(function(resolve, reject) { + var waiter = { appName: appName }; + var timer = timeoutMs > 0 ? setTimeout(function() { + pendingFindWaiters = pendingFindWaiters.filter(function(w) { return w !== waiter; }); + reject(new Error(appName + " not found within " + timeoutMs + "ms")); + }, timeoutMs) : null; + waiter.resolve = function() { clearTimeout(timer); resolve(); }; + waiter.reject = function(err) { clearTimeout(timer); reject(err); }; + pendingFindWaiters.push(waiter); + }); + }; + + // Announce an app as ADD (first time) or RECONNECT (seen before). + // No-op if already announced or not a valid application node. + function announceApp(appName, node) { + if (announcedApps.has(appName)) return; + if (!everSeenApps.has(appName) && (!node || !isApplicationNode(node))) return; + var change = everSeenApps.has(appName) ? obj.structure.RECONNECT : obj.structure.ADD; + announcedApps.add(appName); + everSeenApps.add(appName); + notifyStructure(appName, change); + notifyFindWaiters(appName); + } + + function unannounceApp(appName) { + if (!announcedApps.has(appName)) return; + announcedApps.delete(appName); + notifyStructure(appName, obj.structure.REMOVE); + } + function notifyApplications(connection) { connection.root().forEachChild(function (node) { - if (isApplicationNode(node) && !announcedApps.has(node.name())) { - announcedApps.add(node.name()); - notifyStructure(node.name(), obj.structure.ADD); - } + announceApp(node.name(), node); }); } @@ -741,19 +920,30 @@ studio.internal = (function(proto) { var sys = connection.root(); sys.async.onDone(function (system) { notifyApplications(connection); - // Subscribe to structure changes to propagate app ADD/REMOVE at runtime - system.async.subscribeToStructure(function(appName, change) { - var node = system.child(appName); - if (change === obj.structure.ADD && node && isApplicationNode(node)) { - if (!announcedApps.has(appName)) { - announcedApps.add(appName); - notifyStructure(appName, obj.structure.ADD); + + var primaryConn = appConnections[0]; + var isProxyMode = primaryConn && primaryConn.supportsProxyProtocol(); + + if (isProxyMode) { + // Proxy mode: only handle REMOVE here. ADD/RECONNECT is deferred to + // notifyApplications() after the proxy tunnel connects (via + // tryConnectPendingSiblings → connectViaProxy), ensuring the sibling + // is actually reachable before announcing it. + system.async.subscribeToStructure(function(appName, change) { + if (change === obj.structure.REMOVE) { + unannounceApp(appName); } - } else if (change === obj.structure.REMOVE && announcedApps.has(appName)) { - announcedApps.delete(appName); - notifyStructure(appName, obj.structure.REMOVE); - } - }); + }); + } else { + // Direct mode: each connection owns its local app. + // Connection lifecycle directly maps to app lifecycle. + system.forEachChild(function(app) { + if (isApplicationNode(app)) { + connectionLocalApps.set(connection, app.name()); + } + }); + } + resolve(system); }, reject, sys); } @@ -762,6 +952,24 @@ studio.internal = (function(proto) { return new Promise(function (resolve, reject) { var appConnection = new obj.AppConnection(url, notificationListener, autoConnect); appConnections.push(appConnection); + + // Direct mode lifecycle: connection close → REMOVE, reconnect → RECONNECT + appConnection.onDisconnected = function() { + var localApp = connectionLocalApps.get(appConnection); + if (localApp) unannounceApp(localApp); + }; + appConnection.onReconnected = function() { + if (!connectionLocalApps.has(appConnection)) { + // Initial registration may have timed out — populate now + appConnection.root().forEachChild(function(app) { + if (isApplicationNode(app)) { + connectionLocalApps.set(appConnection, app.name()); + } + }); + } + notifyApplications(appConnection); + }; + appConnection.onServiceConnectionEstablished = function(serviceConnection, instanceKey) { serviceConnection.instanceKey = instanceKey; appConnections.push(serviceConnection); @@ -774,9 +982,8 @@ studio.internal = (function(proto) { connectedSiblings.delete(con.siblingKey); } con.root().forEachChild(function(node) { - if (isApplicationNode(node) && announcedApps.has(node.name())) { - announcedApps.delete(node.name()); - notifyStructure(node.name(), obj.structure.REMOVE); + if (isApplicationNode(node)) { + unannounceApp(node.name()); } }); if (closedByUser) { @@ -816,6 +1023,10 @@ studio.internal = (function(proto) { } this.onConnect = function(resolve, reject, autoConnect) { + if (isClosed) { + reject(new Error("Client has been closed")); + return; + } if (connected) { resolve(this_); return; @@ -841,17 +1052,38 @@ studio.internal = (function(proto) { system.forEachChild(function (app) { if (!app.info().isLocal) { - var appUrl = app.info().serverAddr + ":" + app.info().serverPort; + var appUrl = appAddress(app.info()); promises.push(this_.onAppConnect(appUrl, notificationListener, autoConnect)); } }); + // Watch primary connection's structure for new siblings (direct mode) + system.async.subscribeToStructure(function(appName, change) { + if (change === obj.structure.ADD) { + var app = system.child(appName); + if (app && app.info() && !app.info().isLocal) { + // New sibling discovered — check we don't already have a connection + var alreadyConnected = Array.from(connectionLocalApps.values()).indexOf(appName) >= 0; + if (!alreadyConnected) { + var appUrl = appAddress(app.info()); + this_.onAppConnect(appUrl, notificationListener, autoConnect).catch(function(err) { + console.error("Failed to connect to sibling " + appName + ":", err); + }); + } + } + } + }); + // Direct mode discovery: waitForApp() and subscribeToStructure() + // trigger structure refreshes on demand. } else { system.forEachChild(function (app) { if (!app.info().isLocal) { - knownSiblings.add(app.info().serverAddr + ':' + app.info().serverPort); + knownSiblings.add(appAddress(app.info())); } }); + // Separate from registerConnection's structure subscription (which handles + // user-facing app lifecycle via announceApp/unannounceApp). This one manages + // proxy connection establishment when new siblings appear. system.async.subscribeToStructure(function(appName, change) { if (change === obj.structure.ADD) { var app = system.child(appName); @@ -859,7 +1091,7 @@ studio.internal = (function(proto) { if (app && appInfo) { if (!appInfo.isLocal) { // Remote sibling - track for proxy connection - knownSiblings.add(appInfo.serverAddr + ':' + appInfo.serverPort); + knownSiblings.add(appAddress(appInfo)); tryConnectPendingSiblings(primaryConnection); } else { // Local sibling came back - re-fetch and resubscribe through primary connection @@ -874,7 +1106,7 @@ studio.internal = (function(proto) { system.forEachChild(function (app) { var appInfo = app.info(); if (appInfo && !appInfo.isLocal) { - knownSiblings.add(appInfo.serverAddr + ':' + appInfo.serverPort); + knownSiblings.add(appAddress(appInfo)); } }); tryConnectPendingSiblings(primaryConnection); @@ -1006,6 +1238,10 @@ studio.internal = (function(proto) { // Only fire callbacks for NEW nodes, not existing ones. // Use forEachChild() to iterate existing children. structureSubscriptions.push(structureConsumer); + // Trigger an initial structure refresh to discover current state. + // After this, the server pushes eStructureChangeResponse (id 0) + // when siblings start/stop. + requestStructureRefresh(); }; this.async.unsubscribeFromStructure = function(structureConsumer) { @@ -1045,6 +1281,7 @@ studio.internal = (function(proto) { * Close all connections managed by this system node. */ this.close = function() { + isClosed = true; connectGeneration++; var err = new Error('Connection closed'); pendingConnects.forEach(function(con) { @@ -1068,6 +1305,14 @@ studio.internal = (function(proto) { knownSiblings.clear(); connectedSiblings.clear(); announcedApps.clear(); + everSeenApps.clear(); + connectionLocalApps.clear(); + // Reject all pending find waiters + pendingFindWaiters.forEach(function(waiter) { + waiter.reject(new Error("Client closed")); + }); + pendingFindWaiters = []; + structureSubscriptions = []; }; }; @@ -1136,7 +1381,15 @@ studio.internal = (function(proto) { var reconnectTimeoutId = null; var currentMetadata = null; var closedIntentionally = false; // Set by close() to prevent reconnection - var SERVICES_TIMEOUT_MS = 150000; // 120s resend interval + 30s buffer + var hasNotifiedDisconnect = false; // Guard for onDisconnected lifecycle callback + var hasConnectedBefore = false; // Distinguishes initial connect from reconnect + const SERVICES_TIMEOUT_MS = (INACTIVITY_RESEND_INTERVAL_S + 30) * 1000; + var reconnectDelayMs = INITIAL_RECONNECT_DELAY_MS; + var lastServerMessageTime = 0; // for stall detection + var stallCheckIntervalId = null; + const STALL_CHECK_INTERVAL_MS = 15000; + const STALL_TIMEOUT_MS = (typeof process !== 'undefined' && process.env && Number(process.env.CDP_STALL_TIMEOUT_MS)) + || (INACTIVITY_RESEND_INTERVAL_S + 30) * 1000; // must exceed inactivityResendInterval nodeMap.set(systemNode.id(), systemNode); handler.onContainer = handleIncomingContainer; this.resubscribe = function(item) { @@ -1172,23 +1425,29 @@ studio.internal = (function(proto) { return next; } - var CONNECT_TIMEOUT_MS = 30000; // 30 second timeout for connect calls - var PROXY_MIN_COMPAT_VERSION = proto.PROXY_MIN_COMPAT_VERSION; // From protocol namespace + const CONNECT_TIMEOUT_MS = 30000; + const PROXY_MIN_COMPAT_VERSION = proto.PROXY_MIN_COMPAT_VERSION; - // Helper to schedule reconnection - avoids duplicate code in onError and onClosed + // Helper to schedule reconnection with exponential backoff function scheduleReconnect(logMessage) { if (!autoConnect || !isPrimaryConnection || reconnectTimeoutId) return; + var delay = reconnectDelayMs; + // Add jitter (±20%) to prevent thundering herd + var jitter = delay * 0.2 * (2 * Math.random() - 1); + delay = Math.round(delay + jitter); reconnectTimeoutId = setTimeout(function() { reconnectTimeoutId = null; if (!socketTransport) return; // Ensure proxy state is cleaned up (may already be done by onClosed, but // onError can fire without onClose in Node.js — idempotent if already clean) cleanupPrimaryConnectionState(); - console.log(logMessage); + console.log(logMessage + " (backoff: " + delay + "ms)"); socketTransport.reconnect(appUrl, proto.BINARY_TYPE); handler = new proto.Handler(socketTransport, notificationListener); handler.onContainer = handleIncomingContainer; - }, 3000); + }, delay); + // Exponential backoff: double up to 30s max + reconnectDelayMs = Math.min(reconnectDelayMs * 2, MAX_RECONNECT_DELAY_MS); } function removeServiceConnection(instanceKey, closedByUser) { @@ -1271,9 +1530,9 @@ studio.internal = (function(proto) { pendingSends.push(bytes); } }; - transport.close = function() { - if (isClosed) return; // Already closed - transport._closedByUser = true; + transport.close = function(intentional) { + if (isClosed) return; + transport._closedByUser = !!intentional; cleanup(); appConnection.sendServiceMessage(serviceId, instanceId, proto.ServiceMessageKind.eDisconnect); }; @@ -1321,6 +1580,36 @@ studio.internal = (function(proto) { servicesTimeoutId = null; } + // Stall detection: force-close socket if no server messages for STALL_TIMEOUT_MS + // while there are active subscriptions expecting data. Without subscriptions, + // silence is expected and not a stall. + function hasAnyActiveSubscriptions() { + for (var node of nodeMap.values()) { + if (node.hasSubscriptions()) return true; + } + return false; + } + + function startStallDetection() { + if (stallCheckIntervalId) return; + lastServerMessageTime = Date.now(); + stallCheckIntervalId = setInterval(function() { + if (lastServerMessageTime > 0 && hasAnyActiveSubscriptions() && + Date.now() - lastServerMessageTime > STALL_TIMEOUT_MS) { + console.log("Connection stalled: no server messages for " + STALL_TIMEOUT_MS + "ms with active subscriptions, forcing reconnect"); + stopStallDetection(); + socketTransport.close(); + } + }, STALL_CHECK_INTERVAL_MS); + } + + function stopStallDetection() { + clearInterval(stallCheckIntervalId); + stallCheckIntervalId = null; + } + + this._startStallDetection = startStallDetection; + function cleanupPrimaryConnectionState() { if (!isPrimaryConnection) return; // Notify service instances of disconnect @@ -1336,6 +1625,7 @@ studio.internal = (function(proto) { availableServices.clear(); currentMetadata = null; requests = []; + stopStallDetection(); } this.onServicesReceived = function(services, metadata) { @@ -1419,11 +1709,11 @@ studio.internal = (function(proto) { if (existingConnection) { // Reconnect existing transport with new service instance — preserves nodes and callbacks proxyConnection = existingConnection; - proxyConnection.serviceId = newServiceId; var transport = proxyConnection._getTransport(); // Set onopen to trigger handler recreation + resubscribe (the original onopen // was overwritten by the first connectViaProxy call's new-connection handler) transport.onopen = function() { + proxyConnection._startStallDetection(); proxyConnection._triggerReconnect(); }; // Transport.reconnect() allocates a new instance and fires onopen when connected @@ -1436,7 +1726,6 @@ studio.internal = (function(proto) { proxyConnection = new obj.AppConnection(result.transport, notificationListener, autoConnect); proxyConnection.instanceKey = result.instanceKey; proxyConnection.siblingKey = addr + ':' + port; - proxyConnection.serviceId = newServiceId; serviceConnections.set(result.instanceKey, proxyConnection); } @@ -1454,6 +1743,7 @@ studio.internal = (function(proto) { if (!existingConnection) { var transport = proxyConnection._getTransport(); transport.onopen = function() { + proxyConnection._startStallDetection(); if (appConnection.onServiceConnectionEstablished) { appConnection.onServiceConnectionEstablished(proxyConnection, proxyConnection.instanceKey); } @@ -1502,7 +1792,7 @@ studio.internal = (function(proto) { this.sendServiceMessage = function(serviceId, instanceId, kind, payload) { var serviceMessage = proto.ServiceMessage.create({ serviceId: serviceId, - instanceId: instanceId || 0, + instanceId: instanceId, kind: kind }); if (payload) { @@ -1517,6 +1807,8 @@ studio.internal = (function(proto) { // Returns true if proxy protocol is supported (compat >= PROXY_MIN_COMPAT_VERSION) // When true, backends are accessed via ServiceMessage tunneling, not direct connections this.supportsProxyProtocol = function() { + if (typeof process !== 'undefined' && process.env && process.env.CDP_FORCE_DIRECT_MODE === '1') + return false; return currentMetadata && currentMetadata.compatVersion >= PROXY_MIN_COMPAT_VERSION; }; @@ -1531,10 +1823,22 @@ studio.internal = (function(proto) { // Clear any pending reconnect timeout since we're now connected clearTimeout(reconnectTimeoutId); reconnectTimeoutId = null; + reconnectDelayMs = INITIAL_RECONNECT_DELAY_MS; + hasNotifiedDisconnect = false; // Reset disconnect guard for next cycle + startStallDetection(); // Note: For proxy connections, connectViaProxy overwrites transport.onopen - // with _triggerReconnect(), so this handler only fires for the primary connection. + // and calls _startStallDetection() there instead. // Primary handler recreation happens in scheduleReconnect before reconnect(). appConnection.resubscribe(systemNode); + // Notify lifecycle callback after structure refetch completes (not on initial connect) + if (hasConnectedBefore && appConnection.onReconnected) { + systemNode.async.onDone(function() { + appConnection.onReconnected(); + }, function(err) { + console.error("Structure refetch failed on reconnect:", err); + }, systemNode); + } + hasConnectedBefore = true; }; onClosed = function (event) { if (closedIntentionally) return; @@ -1572,6 +1876,13 @@ studio.internal = (function(proto) { console.log("Socket close: " + reason); + // Notify lifecycle callback once per disconnect (not on each reconnection attempt) + if (!hasNotifiedDisconnect && appConnection.onDisconnected) { + hasNotifiedDisconnect = true; + appConnection.onDisconnected(); + } + + stopStallDetection(); clearServicesTimeout(); clearTimeout(reconnectTimeoutId); reconnectTimeoutId = null; @@ -1638,6 +1949,8 @@ studio.internal = (function(proto) { } if (stop) { request.stop = stop; + } else { + request.inactivityResendInterval = INACTIVITY_RESEND_INTERVAL_S; } msg.messageType = proto.ContainerType.eGetterRequest; msg.getterRequest = [request]; @@ -1650,6 +1963,8 @@ studio.internal = (function(proto) { request.nodeId = id; if (stop) { request.stop = stop; + } else { + request.inactivityResendInterval = INACTIVITY_RESEND_INTERVAL_S; } if (startingFrom != undefined) { request.startingFrom = startingFrom; @@ -1847,6 +2162,7 @@ studio.internal = (function(proto) { } function handleIncomingContainer(protoContainer, metadata) { + lastServerMessageTime = Date.now(); // Update stall detection timestamp // Set currentMetadata from Hello message immediately (not just on ServicesNotification) // This ensures supportsProxyProtocol() works before ServicesNotification arrives if (!currentMetadata && metadata) { @@ -1913,20 +2229,17 @@ studio.internal = (function(proto) { * For primary connections, this closes the WebSocket. */ this.close = function() { - // Mark as intentionally closed to prevent reconnection attempts closedIntentionally = true; - // Clear pending reconnect to prevent accessing null socketTransport + stopStallDetection(); clearTimeout(reconnectTimeoutId); reconnectTimeoutId = null; - // Clear services timeout to prevent timer firing after close clearServicesTimeout(); - // Reset auth state to prevent stale state on reconnect reauthRequestPending = false; cleanupPrimaryConnectionState(); if (socketTransport) { var transport = socketTransport; socketTransport = null; // Guard against double-close - transport.close(); + transport.close(true); } }; }; @@ -2118,7 +2431,7 @@ studio.api = (function(internal) { * * @callback structureConsumer * @param {string} node name - * @param {number} REMOVE 0/ADD 1 from studio.api.structure.ADD / studio.api.structure.REMOVE + * @param {number} change - ADD (1), REMOVE (0), or RECONNECT (2) from studio.api.structure */ /** @@ -2234,7 +2547,7 @@ studio.api = (function(internal) { var findNodeCacheInvalidator = null; // Set after findNodeCache is created var system = new internal.SystemNode(studioURL, notificationListener, function(appName) { - // Called when app structure changes (ADD or REMOVE) + // Called on app structure changes (ADD, REMOVE, or RECONNECT) findNodeCacheInvalidator(appName); }); @@ -2263,7 +2576,7 @@ studio.api = (function(internal) { var nodes = {}; function f(promise, nodeName, index, arr) { - var path = arr.slice(0,index+1); + var path = arr.slice(0,index+1).join('.'); if (memoize[path] && !nodes[path]) return memoize[path]; else if (memoize[path] && nodes[path] && nodes[path].isValid()) @@ -2284,8 +2597,7 @@ studio.api = (function(internal) { // Invalidate cache entries for a specific app name (called on structure changes) f.invalidateApp = function(appName) { Object.keys(memoize).forEach(function(key) { - // Key is array converted to string, e.g. "App2" or "App2,CPULoad" - if (key === appName || key.startsWith(appName + ',')) { + if (key === appName || key.startsWith(appName + '.')) { delete memoize[key]; delete nodes[key]; } @@ -2297,14 +2609,46 @@ studio.api = (function(internal) { var findNode = findNodeCache; findNodeCacheInvalidator = findNodeCache.invalidateApp; /** - * Request node with provided path. + * Request node with provided path. Waits indefinitely for the target app + * to appear if it is not yet available. * - * @param nodePath Should contain dot separated path to target node. Note: root node is not considered part of the path. + * @param nodePath Dot-separated path to target node (e.g. 'App2.CPULoad'). + * @param options Optional. { timeout: milliseconds } to limit wait time. + * Use { timeout: 0 } to fail immediately if the app is not available. * @returns {Promise.} A promise containing requested node when fulfilled. */ - this.find = function(nodePath) { - var nodes = nodePath.split("."); - return nodes.reduce(findNode, this.root()); + this.find = function(nodePath, options) { + if (!nodePath) { + return Promise.reject("Child not found: "); + } + var pathParts = nodePath.split("."); + var appName = pathParts[0]; + var self = this; + + function doFind() { + // Reject immediately if the app was previously seen but is now disconnected. + if (system.wasAppSeen(appName) && !system.isAppAvailable(appName)) { + return Promise.reject(appName + " is not available"); + } + return pathParts.reduce(findNode, self.root()); + } + + // timeout: 0 means immediate fail (old behavior) + if (options && options.timeout === 0) { + return doFind(); + } + + // timeout > 0 means wait up to that many ms; no timeout means wait indefinitely + var timeoutMs = (options && options.timeout > 0) ? options.timeout : 0; + + // Trigger connection before waiting — prevents deadlock when find() + // is called without a prior root() call. Surface connection errors + // to the caller instead of swallowing them. + return self.root().then(function() { + return system.waitForApp(appName, timeoutMs); + }).then(function() { + return doFind(); + }); }; this._getAppConnections = function() { diff --git a/package.json b/package.json index 1dccfa9..ac80f85 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "cdp-client", - "version": "2.5.0", + "version": "3.0.0", "description": "A simple Javascript interface for the CDP Studio development platform that allows Javascript applications to interact with", "main": "index.js", "scripts": { diff --git a/test/duplicate-values-reconnect.test.js b/test/duplicate-values-reconnect.test.js index 233ef3a..1c7833b 100644 --- a/test/duplicate-values-reconnect.test.js +++ b/test/duplicate-values-reconnect.test.js @@ -521,9 +521,7 @@ describe('Value Delivery', () => { expect(systemNode.lastValue()).toBe(43.5); }); - test('calling receiveValue twice with same timestamp should still call callback twice (no dedup)', () => { - // This tests that receiveValue is a simple passthrough - it doesn't deduplicate - // Deduplication should happen at a higher level if needed + test('calling receiveValue twice with same value and increasing timestamps delivers both', () => { const transport = new FakeTransport(); const app = new internal.AppConnection(transport, null, false); @@ -532,12 +530,17 @@ describe('Value Delivery', () => { app.root().async.subscribeToValues(valueConsumer, 5, 0); - // Intentionally send same value twice (simulates potential duplicate from reconnect) app.root().receiveValue(100, 1000); - app.root().receiveValue(100, 1000); // Same timestamp! + app.root().receiveValue(100, 1001); // same value, newer timestamp — delivered + expect(callbackCount.count).toBe(2); - // This tests current behavior - if the implementation SHOULD deduplicate, this test documents it doesn't + // Same value with OLDER timestamp — filtered (reconnect replay) + app.root().receiveValue(100, 999); expect(callbackCount.count).toBe(2); + + // Different value with SAME timestamp as last — delivered (not filtered) + app.root().receiveValue(200, 1001); + expect(callbackCount.count).toBe(3); }); }); diff --git a/test/find-and-structure-events.test.js b/test/find-and-structure-events.test.js new file mode 100644 index 0000000..ff2df8f --- /dev/null +++ b/test/find-and-structure-events.test.js @@ -0,0 +1,68 @@ +/** + * find() wait semantics and RECONNECT structure event tests + * + * Unit tests for the new public API surfaces. Tests that require + * a live connection (find() timeout behavior, subscribeToStructure + * RECONNECT events) are covered by the component tests in the + * parent cdp monorepo. + */ + +global.WebSocket = require('ws'); +const studio = require('../index'); + +describe('RECONNECT structure constant', () => { + test('studio.api.structure has ADD, REMOVE, and RECONNECT with correct values', () => { + expect(studio.api.structure.ADD).toBe(1); + expect(studio.api.structure.REMOVE).toBe(0); + expect(studio.api.structure.RECONNECT).toBe(2); + }); + + test('RECONNECT is distinct from ADD and REMOVE', () => { + const values = [studio.api.structure.ADD, studio.api.structure.REMOVE, studio.api.structure.RECONNECT]; + expect(new Set(values).size).toBe(3); + }); +}); + +describe('find() input validation', () => { + test('find() with empty path rejects', async () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + await expect(client.find('')).rejects.toContain('not found'); + client.close(); + }); + + test('find() with undefined path rejects', async () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + await expect(client.find()).rejects.toContain('not found'); + client.close(); + }); +}); + +describe('close() is terminal', () => { + test('close() can be called multiple times without error', () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + expect(() => { + client.close(); + client.close(); + }).not.toThrow(); + }); + + test('root() after close() rejects with closed error', async () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + client.close(); + await expect(client.root()).rejects.toThrow(); + }); + + test('find() after close() rejects with closed error', async () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + client.close(); + await expect(client.find('App.Signal')).rejects.toThrow(); + }); + + test('close() rejects pending find() waiters', async () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + const findPromise = client.find('NonExistentApp.Signal'); + await new Promise(r => setTimeout(r, 50)); + client.close(); + await expect(findPromise).rejects.toThrow(); + }); +});