diff --git a/README.rst b/README.rst index 8a302d8..efb93c5 100644 --- a/README.rst +++ b/README.rst @@ -38,6 +38,8 @@ Example root.subscribeToStructure((name, change) => { if (change === studio.api.structure.ADD) subscribeToApp(name); + if (change === studio.api.structure.RECONNECT) + console.log(`${name} restarted, subscriptions intact`); }); }).catch(err => console.error("Connection failed:", err)); @@ -61,6 +63,41 @@ Benefits - Simplified firewall configuration - only one port needs to be opened - SSH port forwarding - forward a single port to access entire CDP system +Structure Events +---------------- + +On the root node, ``subscribeToStructure`` tracks application lifecycle with three event types: + +- ``studio.api.structure.ADD`` (1) — An application appeared for the first time +- ``studio.api.structure.REMOVE`` (0) — An application went offline +- ``studio.api.structure.RECONNECT`` (2) — An application restarted (was seen before, went offline, came back) + +On other nodes, ADD and REMOVE fire when children are added or removed at runtime. +RECONNECT only fires at the root level. + +When an app restarts, the client automatically restores value and event subscriptions, +so user code does not need to re-subscribe. RECONNECT is informational — use it for +logging or UI updates. + + .. code:: javascript + + client.root().then(root => { + root.subscribeToStructure((appName, change) => { + if (change === studio.api.structure.ADD) { + console.log(`New app online: ${appName}`); + client.find(appName + '.CPULoad').then(node => { + node.subscribeToValues(v => console.log(`[${appName}] CPULoad: ${v}`)); + }).catch(err => console.error(`Failed to find ${appName}.CPULoad:`, err)); + } + if (change === studio.api.structure.REMOVE) { + console.log(`App offline: ${appName}`); + } + if (change === studio.api.structure.RECONNECT) { + console.log(`App restarted: ${appName}, subscriptions intact`); + } + }); + }).catch(err => console.error("Connection failed:", err)); + API --- @@ -325,31 +362,46 @@ client.root() // use the system INode object to access connected structure. }); -client.find(path) -^^^^^^^^^^^^^^^^^ +client.find(path, options) +^^^^^^^^^^^^^^^^^^^^^^^^^ - Arguments - path - Path of the object to look for. + path - Dot-separated path to target node (e.g. ``'App2.CPULoad'``). + + options - Optional. Without this, find() waits indefinitely for the app to appear. + ``{ timeout: milliseconds }`` to limit wait time. + ``{ timeout: 0 }`` to fail immediately if the app is not available. - Returns Promise containing requested INode object when fulfilled. -- Restriction +- Behavior - The requested node must reside in the application client was connected to. + When no timeout option is provided, waits indefinitely for the target application to appear. + If the application is already available, resolves immediately. No prior ``root()`` call is needed — ``find()`` triggers + the connection internally. -- Usage +- Examples - The provided path must contain dot separated path to target node. **Root node is not considered part of the path.** + .. code:: javascript -- Example + // Waits indefinitely for App2 to appear + client.find("App2.CPULoad").then(function (load) { + load.subscribeToValues(function (value) { + console.log("CPULoad:", value); + }); + }); - .. code:: javascript + // Wait up to 5 seconds + client.find("App2.CPULoad", { timeout: 5000 }).catch(function (err) { + console.log(err.message); // "App2 not found within 5000ms" + }); - client.find("MyApp.CPULoad").then(function (load) { - // use the load object referring to CPULoad in MyApp + // Fail immediately if not available (old behavior) + client.find("App2.CPULoad", { timeout: 0 }).catch(function (err) { + console.log("Not available right now"); }); client.close() @@ -604,8 +656,9 @@ node.subscribeToStructure(structureConsumer) - Usage - Subscribe to structure changes on this node. Each time child is added or removed from current node - structureConsumer function is called with the name of the node and change argument where ADD == 1 and REMOVE == 0. + Subscribe to structure changes on this node. Each time a child is added or removed, + structureConsumer is called with the child name and change (ADD == 1, REMOVE == 0). + On the root node, RECONNECT (2) fires when a previously-seen application restarts. node.unsubscribeFromStructure(structureConsumer) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/index.js b/index.js index 0cbc838..f7cc9cc 100644 --- a/index.js +++ b/index.js @@ -242,7 +242,7 @@ studio.protocol = (function(ProtoBuf) { } // Minimum compat version required for proxy protocol support - var PROXY_MIN_COMPAT_VERSION = 4; + const PROXY_MIN_COMPAT_VERSION = 4; obj.PROXY_MIN_COMPAT_VERSION = PROXY_MIN_COMPAT_VERSION; // Create encoded ServicesRequest container bytes for proxy protocol @@ -398,8 +398,8 @@ studio.protocol = (function(ProtoBuf) { resolve(new ContainerHandler(onContainer, onError, metadata)); } }) - .catch(function(){ - console.log("Application acceptance denied.") + .catch(function(err){ + console.error("Application acceptance failed:", err); resolve(this); }.bind(this)); }.bind(this)); @@ -455,9 +455,17 @@ studio.internal = (function(proto) { obj.structure = { REMOVE: 0, - ADD: 1 + ADD: 1, + RECONNECT: 2 }; + const STRUCTURE_REQUEST_TIMEOUT_MS = 30000; + const MAX_RECONNECT_DELAY_MS = 30000; + const INITIAL_RECONNECT_DELAY_MS = 1000; + const INACTIVITY_RESEND_INTERVAL_S = 120; + const EVENT_HISTORY_WAIT_NS = 120e9; // 120s — accept old events during this window after subscribe + const EVENT_DEDUP_WINDOW_NS = 30e9; // 30s — trim entries older than this after history wait + // Helper to remove first matching item from array (shared by AppNode and SystemNode) function removeFirst(array, predicate) { var idx = array.findIndex(predicate); @@ -479,6 +487,8 @@ studio.internal = (function(proto) { var lastInfo = null; //when we get this, if there are any child requests we need to fetch child fetch too var valid = true; var hasActiveValueSubscription = false; // track if we've sent a getter request to server + var lastServerTimestamp = null; // for value dedup after reconnect + var lastEventTimestamp = 0; // track last received event timestamp for reconnect resume this.path = function() { var path = ""; @@ -514,6 +524,7 @@ studio.internal = (function(proto) { givenPromises.forEach(function(promiseHandlers, apiNode) { promiseHandlers.forEach(function(promiseHandler) { try { + clearTimeout(promiseHandler.timer); promiseHandler.reject(apiNode); } catch (e) { /* ignore */ } }); @@ -522,7 +533,7 @@ studio.internal = (function(proto) { }; this.hasSubscriptions = function() { - return valueSubscriptions.length > 0; + return valueSubscriptions.length > 0 || eventSubscriptions.length > 0; }; this.info = function() { @@ -552,9 +563,17 @@ studio.internal = (function(proto) { parent = nodeParent; lastInfo = protoInfo; id = protoInfo.nodeId; + // Keep lastServerTimestamp across reconnect — filters the server's + // cached last-known-value replay (which has the original timestamp). + // Per-listener recentEvents preserved across reconnect — the server + // replays from lastEventTimestamp (inclusive), and preserving each + // listener's cache prevents boundary events from being duplicated. this.async._makeGetterRequest(); - for (var i = 0; i < eventSubscriptions.length; i++) - app.makeEventRequest(id, eventSubscriptions[i][1], false); + for (var i = 0; i < eventSubscriptions.length; i++) { + // Resume from last received event timestamp (not original startingFrom) to avoid duplicates + var resumeFrom = lastEventTimestamp > 0 ? lastEventTimestamp : eventSubscriptions[i].startingFrom; + app.makeEventRequest(id, resumeFrom, false); + } }; this.add = function(node) { @@ -579,7 +598,6 @@ studio.internal = (function(proto) { this.done = function() { structureFetched = true; valid = true; // Re-validate node when structure is successfully fetched - //Call process node requests from childRequests givenPromises.forEach(function (promiseHandlers, apiNode) { promiseHandlers.forEach(function(promiseHandler) { if (apiNode.isValid()) { @@ -598,6 +616,16 @@ studio.internal = (function(proto) { }; this.receiveValue = function (nodeValue, nodeTimestamp) { + // Skip values with older timestamps (filters reconnect replay where + // the server resends the last known value with its original timestamp). + if (nodeTimestamp !== undefined) { + var ts = Number(nodeTimestamp); + if (ts > 0) { + if (lastServerTimestamp !== null && ts < lastServerTimestamp) + return; + lastServerTimestamp = ts; + } + } lastValue = nodeValue; for (var i = 0; i < valueSubscriptions.length; i++) { valueSubscriptions[i][0](nodeValue, nodeTimestamp); @@ -605,8 +633,43 @@ studio.internal = (function(proto) { }; this.receiveEvent = function (event) { + // Per-listener event dedup — each subscriber has independent dedup state. + var hasId = event.id !== undefined; + var ts = event.timestamp !== undefined ? Number(event.timestamp) : undefined; + var eventKey = hasId ? String(event.id) : null; + var nowNs = hasId ? Date.now() * 1e6 : 0; + if (ts !== undefined && ts > lastEventTimestamp) lastEventTimestamp = ts; for (var i = 0; i < eventSubscriptions.length; i++) { - eventSubscriptions[i][0](event); + var sub = eventSubscriptions[i]; + if (hasId && ts !== undefined) { + var isPastHistoryWait = nowNs > sub.dedupStartNs + EVENT_HISTORY_WAIT_NS; + // Reject events older than lowest tracked timestamp after history wait. + // When Map is empty (lowestTs 0), any event passes (nothing to compare against). + var lowestTs = sub.recentEvents.size > 0 ? sub.lowestTs : 0; + if (ts < lowestTs && isPastHistoryWait) continue; + // Exact (eventId, timestamp) duplicate check using composite key. + // Composite key allows multiple timestamps per eventId (recurring alarms). + var compositeKey = eventKey + ':' + event.timestamp; + if (sub.recentEvents.has(compositeKey)) continue; + sub.recentEvents.set(compositeKey, ts); + if (sub.recentEvents.size === 1 || ts < sub.lowestTs) sub.lowestTs = ts; + // Trim expired entries after history wait. At high event rates, the adaptive + // threshold (trimAt) amortizes the O(n) scan. At low rates (below threshold), + // check if the oldest entry has expired to keep the dedup window accurate. + if (isPastHistoryWait) { + var cutoff = ts - EVENT_DEDUP_WINDOW_NS; + if (sub.recentEvents.size > sub.trimAt + || (sub.recentEvents.size <= sub.trimAt && sub.lowestTs > 0 && sub.lowestTs < cutoff)) { + sub.lowestTs = 0; + sub.recentEvents.forEach(function(storedTs, key) { + if (storedTs < cutoff) sub.recentEvents.delete(key); + else if (sub.lowestTs === 0 || storedTs < sub.lowestTs) sub.lowestTs = storedTs; + }); + sub.trimAt = Math.max(100, sub.recentEvents.size * 2); + } + } + } + sub.callback(event); } }; @@ -618,7 +681,23 @@ studio.internal = (function(proto) { if (!givenPromises.has(apiNode)) { givenPromises.set(apiNode, []); } - givenPromises.get(apiNode).push({resolve: resolve, reject: reject}); + var settled = false; + var entry = { + resolve: function(v) { if (!settled) { settled = true; clearTimeout(entry.timer); resolve(v); } }, + reject: function(v) { if (!settled) { settled = true; clearTimeout(entry.timer); reject(v); } }, + timer: setTimeout(function() { + if (!settled) { + settled = true; + // Remove from givenPromises to prevent double-fire + var arr = givenPromises.get(apiNode); + var idx = arr.indexOf(entry); + if (idx >= 0) arr.splice(idx, 1); + if (arr.length === 0) givenPromises.delete(apiNode); + reject(new Error("Structure request timed out after " + STRUCTURE_REQUEST_TIMEOUT_MS + "ms")); + } + }, STRUCTURE_REQUEST_TIMEOUT_MS) + }; + givenPromises.get(apiNode).push(entry); } else { if (apiNode.isValid()) { resolve(apiNode); @@ -652,12 +731,31 @@ studio.internal = (function(proto) { }; this.async.subscribeToEvents = function(eventConsumer, startingFrom) { - eventSubscriptions.push([eventConsumer, startingFrom]); + var existing = null; + for (var si = 0; si < eventSubscriptions.length; si++) { + if (eventSubscriptions[si].callback === eventConsumer) { existing = eventSubscriptions[si]; break; } + } + if (existing) { + existing.startingFrom = startingFrom; + existing.recentEvents = new Map(); + existing.lowestTs = 0; + existing.trimAt = 100; + existing.dedupStartNs = Date.now() * 1e6; + } else { + eventSubscriptions.push({ + callback: eventConsumer, + startingFrom: startingFrom, + recentEvents: new Map(), + lowestTs: 0, + trimAt: 100, + dedupStartNs: Date.now() * 1e6 + }); + } app.makeEventRequest(id, startingFrom, false); }; this.async.unsubscribeFromEvents = function(eventConsumer) { - removeFirst(eventSubscriptions, function(s) { return s[0] === eventConsumer; }); + removeFirst(eventSubscriptions, function(s) { return s.callback === eventConsumer; }); if (eventSubscriptions.length === 0) app.makeEventRequest(id, 0, true); }; @@ -699,9 +797,13 @@ studio.internal = (function(proto) { var connected = false; var connecting = false; var connectGeneration = 0; + var isClosed = false; var structureSubscriptions = []; var announcedApps = new Set(); + var everSeenApps = new Set(); + var pendingFindWaiters = []; // for find() waiting on late apps var pendingFetches = []; + var connectionLocalApps = new Map(); // Maps AppConnection → local app name (direct mode) var this_ = this; function isApplicationNode(node) { @@ -709,6 +811,10 @@ studio.internal = (function(proto) { return info && info.isLocal && info.nodeType === proto.CDPNodeType.CDP_APPLICATION; } + function appAddress(info) { + return info.serverAddr + ':' + info.serverPort; + } + function notifyStructure(name, change) { // Notify internal cache invalidation callback (constructor param) if (onStructureChange) { @@ -728,12 +834,85 @@ studio.internal = (function(proto) { }); } + // Wake up find() callers waiting for a specific app to appear + function notifyFindWaiters(appName) { + pendingFindWaiters = pendingFindWaiters.filter(function(waiter) { + if (waiter.appName === appName) { + waiter.resolve(); + return false; // remove from list + } + return true; + }); + } + + // Check if an app is currently connected and announced + this.isAppAvailable = function(appName) { + return announcedApps.has(appName); + }; + + // Check if an app was ever seen (used to distinguish "not yet discovered" from "disconnected") + this.wasAppSeen = function(appName) { + return everSeenApps.has(appName); + }; + + // Check if client is in direct mode (no proxy protocol) + this.isDirectMode = function() { + return appConnections.length > 0 && !appConnections[0].supportsProxyProtocol(); + }; + + // Request a structure refresh from the primary connection (direct mode only). + // In proxy mode, discovery is push-based via ServicesNotification so this is a no-op. + function requestStructureRefresh() { + if (appConnections[0] && !appConnections[0].supportsProxyProtocol()) { + appConnections[0].makeStructureRequest(0); + } + } + + // Register a waiter for a specific app name, with timeout + this.waitForApp = function(appName, timeoutMs) { + // Check if already available + if (announcedApps.has(appName)) { + return Promise.resolve(); + } + + // In direct mode, ask the server now to trigger immediate discovery. + // After this, the server will push eStructureChangeResponse (id 0) + // when siblings start/stop. + requestStructureRefresh(); + + return new Promise(function(resolve, reject) { + var waiter = { appName: appName }; + var timer = timeoutMs > 0 ? setTimeout(function() { + pendingFindWaiters = pendingFindWaiters.filter(function(w) { return w !== waiter; }); + reject(new Error(appName + " not found within " + timeoutMs + "ms")); + }, timeoutMs) : null; + waiter.resolve = function() { clearTimeout(timer); resolve(); }; + waiter.reject = function(err) { clearTimeout(timer); reject(err); }; + pendingFindWaiters.push(waiter); + }); + }; + + // Announce an app as ADD (first time) or RECONNECT (seen before). + // No-op if already announced or not a valid application node. + function announceApp(appName, node) { + if (announcedApps.has(appName)) return; + if (!everSeenApps.has(appName) && (!node || !isApplicationNode(node))) return; + var change = everSeenApps.has(appName) ? obj.structure.RECONNECT : obj.structure.ADD; + announcedApps.add(appName); + everSeenApps.add(appName); + notifyStructure(appName, change); + notifyFindWaiters(appName); + } + + function unannounceApp(appName) { + if (!announcedApps.has(appName)) return; + announcedApps.delete(appName); + notifyStructure(appName, obj.structure.REMOVE); + } + function notifyApplications(connection) { connection.root().forEachChild(function (node) { - if (isApplicationNode(node) && !announcedApps.has(node.name())) { - announcedApps.add(node.name()); - notifyStructure(node.name(), obj.structure.ADD); - } + announceApp(node.name(), node); }); } @@ -741,19 +920,30 @@ studio.internal = (function(proto) { var sys = connection.root(); sys.async.onDone(function (system) { notifyApplications(connection); - // Subscribe to structure changes to propagate app ADD/REMOVE at runtime - system.async.subscribeToStructure(function(appName, change) { - var node = system.child(appName); - if (change === obj.structure.ADD && node && isApplicationNode(node)) { - if (!announcedApps.has(appName)) { - announcedApps.add(appName); - notifyStructure(appName, obj.structure.ADD); + + var primaryConn = appConnections[0]; + var isProxyMode = primaryConn && primaryConn.supportsProxyProtocol(); + + if (isProxyMode) { + // Proxy mode: only handle REMOVE here. ADD/RECONNECT is deferred to + // notifyApplications() after the proxy tunnel connects (via + // tryConnectPendingSiblings → connectViaProxy), ensuring the sibling + // is actually reachable before announcing it. + system.async.subscribeToStructure(function(appName, change) { + if (change === obj.structure.REMOVE) { + unannounceApp(appName); } - } else if (change === obj.structure.REMOVE && announcedApps.has(appName)) { - announcedApps.delete(appName); - notifyStructure(appName, obj.structure.REMOVE); - } - }); + }); + } else { + // Direct mode: each connection owns its local app. + // Connection lifecycle directly maps to app lifecycle. + system.forEachChild(function(app) { + if (isApplicationNode(app)) { + connectionLocalApps.set(connection, app.name()); + } + }); + } + resolve(system); }, reject, sys); } @@ -762,6 +952,24 @@ studio.internal = (function(proto) { return new Promise(function (resolve, reject) { var appConnection = new obj.AppConnection(url, notificationListener, autoConnect); appConnections.push(appConnection); + + // Direct mode lifecycle: connection close → REMOVE, reconnect → RECONNECT + appConnection.onDisconnected = function() { + var localApp = connectionLocalApps.get(appConnection); + if (localApp) unannounceApp(localApp); + }; + appConnection.onReconnected = function() { + if (!connectionLocalApps.has(appConnection)) { + // Initial registration may have timed out — populate now + appConnection.root().forEachChild(function(app) { + if (isApplicationNode(app)) { + connectionLocalApps.set(appConnection, app.name()); + } + }); + } + notifyApplications(appConnection); + }; + appConnection.onServiceConnectionEstablished = function(serviceConnection, instanceKey) { serviceConnection.instanceKey = instanceKey; appConnections.push(serviceConnection); @@ -774,9 +982,8 @@ studio.internal = (function(proto) { connectedSiblings.delete(con.siblingKey); } con.root().forEachChild(function(node) { - if (isApplicationNode(node) && announcedApps.has(node.name())) { - announcedApps.delete(node.name()); - notifyStructure(node.name(), obj.structure.REMOVE); + if (isApplicationNode(node)) { + unannounceApp(node.name()); } }); if (closedByUser) { @@ -816,6 +1023,10 @@ studio.internal = (function(proto) { } this.onConnect = function(resolve, reject, autoConnect) { + if (isClosed) { + reject(new Error("Client has been closed")); + return; + } if (connected) { resolve(this_); return; @@ -841,17 +1052,38 @@ studio.internal = (function(proto) { system.forEachChild(function (app) { if (!app.info().isLocal) { - var appUrl = app.info().serverAddr + ":" + app.info().serverPort; + var appUrl = appAddress(app.info()); promises.push(this_.onAppConnect(appUrl, notificationListener, autoConnect)); } }); + // Watch primary connection's structure for new siblings (direct mode) + system.async.subscribeToStructure(function(appName, change) { + if (change === obj.structure.ADD) { + var app = system.child(appName); + if (app && app.info() && !app.info().isLocal) { + // New sibling discovered — check we don't already have a connection + var alreadyConnected = Array.from(connectionLocalApps.values()).indexOf(appName) >= 0; + if (!alreadyConnected) { + var appUrl = appAddress(app.info()); + this_.onAppConnect(appUrl, notificationListener, autoConnect).catch(function(err) { + console.error("Failed to connect to sibling " + appName + ":", err); + }); + } + } + } + }); + // Direct mode discovery: waitForApp() and subscribeToStructure() + // trigger structure refreshes on demand. } else { system.forEachChild(function (app) { if (!app.info().isLocal) { - knownSiblings.add(app.info().serverAddr + ':' + app.info().serverPort); + knownSiblings.add(appAddress(app.info())); } }); + // Separate from registerConnection's structure subscription (which handles + // user-facing app lifecycle via announceApp/unannounceApp). This one manages + // proxy connection establishment when new siblings appear. system.async.subscribeToStructure(function(appName, change) { if (change === obj.structure.ADD) { var app = system.child(appName); @@ -859,7 +1091,7 @@ studio.internal = (function(proto) { if (app && appInfo) { if (!appInfo.isLocal) { // Remote sibling - track for proxy connection - knownSiblings.add(appInfo.serverAddr + ':' + appInfo.serverPort); + knownSiblings.add(appAddress(appInfo)); tryConnectPendingSiblings(primaryConnection); } else { // Local sibling came back - re-fetch and resubscribe through primary connection @@ -874,7 +1106,7 @@ studio.internal = (function(proto) { system.forEachChild(function (app) { var appInfo = app.info(); if (appInfo && !appInfo.isLocal) { - knownSiblings.add(appInfo.serverAddr + ':' + appInfo.serverPort); + knownSiblings.add(appAddress(appInfo)); } }); tryConnectPendingSiblings(primaryConnection); @@ -1006,6 +1238,10 @@ studio.internal = (function(proto) { // Only fire callbacks for NEW nodes, not existing ones. // Use forEachChild() to iterate existing children. structureSubscriptions.push(structureConsumer); + // Trigger an initial structure refresh to discover current state. + // After this, the server pushes eStructureChangeResponse (id 0) + // when siblings start/stop. + requestStructureRefresh(); }; this.async.unsubscribeFromStructure = function(structureConsumer) { @@ -1045,6 +1281,7 @@ studio.internal = (function(proto) { * Close all connections managed by this system node. */ this.close = function() { + isClosed = true; connectGeneration++; var err = new Error('Connection closed'); pendingConnects.forEach(function(con) { @@ -1068,6 +1305,14 @@ studio.internal = (function(proto) { knownSiblings.clear(); connectedSiblings.clear(); announcedApps.clear(); + everSeenApps.clear(); + connectionLocalApps.clear(); + // Reject all pending find waiters + pendingFindWaiters.forEach(function(waiter) { + waiter.reject(new Error("Client closed")); + }); + pendingFindWaiters = []; + structureSubscriptions = []; }; }; @@ -1136,7 +1381,15 @@ studio.internal = (function(proto) { var reconnectTimeoutId = null; var currentMetadata = null; var closedIntentionally = false; // Set by close() to prevent reconnection - var SERVICES_TIMEOUT_MS = 150000; // 120s resend interval + 30s buffer + var hasNotifiedDisconnect = false; // Guard for onDisconnected lifecycle callback + var hasConnectedBefore = false; // Distinguishes initial connect from reconnect + const SERVICES_TIMEOUT_MS = (INACTIVITY_RESEND_INTERVAL_S + 30) * 1000; + var reconnectDelayMs = INITIAL_RECONNECT_DELAY_MS; + var lastServerMessageTime = 0; // for stall detection + var stallCheckIntervalId = null; + const STALL_CHECK_INTERVAL_MS = 15000; + const STALL_TIMEOUT_MS = (typeof process !== 'undefined' && process.env && Number(process.env.CDP_STALL_TIMEOUT_MS)) + || (INACTIVITY_RESEND_INTERVAL_S + 30) * 1000; // must exceed inactivityResendInterval nodeMap.set(systemNode.id(), systemNode); handler.onContainer = handleIncomingContainer; this.resubscribe = function(item) { @@ -1172,23 +1425,29 @@ studio.internal = (function(proto) { return next; } - var CONNECT_TIMEOUT_MS = 30000; // 30 second timeout for connect calls - var PROXY_MIN_COMPAT_VERSION = proto.PROXY_MIN_COMPAT_VERSION; // From protocol namespace + const CONNECT_TIMEOUT_MS = 30000; + const PROXY_MIN_COMPAT_VERSION = proto.PROXY_MIN_COMPAT_VERSION; - // Helper to schedule reconnection - avoids duplicate code in onError and onClosed + // Helper to schedule reconnection with exponential backoff function scheduleReconnect(logMessage) { if (!autoConnect || !isPrimaryConnection || reconnectTimeoutId) return; + var delay = reconnectDelayMs; + // Add jitter (±20%) to prevent thundering herd + var jitter = delay * 0.2 * (2 * Math.random() - 1); + delay = Math.round(delay + jitter); reconnectTimeoutId = setTimeout(function() { reconnectTimeoutId = null; if (!socketTransport) return; // Ensure proxy state is cleaned up (may already be done by onClosed, but // onError can fire without onClose in Node.js — idempotent if already clean) cleanupPrimaryConnectionState(); - console.log(logMessage); + console.log(logMessage + " (backoff: " + delay + "ms)"); socketTransport.reconnect(appUrl, proto.BINARY_TYPE); handler = new proto.Handler(socketTransport, notificationListener); handler.onContainer = handleIncomingContainer; - }, 3000); + }, delay); + // Exponential backoff: double up to 30s max + reconnectDelayMs = Math.min(reconnectDelayMs * 2, MAX_RECONNECT_DELAY_MS); } function removeServiceConnection(instanceKey, closedByUser) { @@ -1271,9 +1530,9 @@ studio.internal = (function(proto) { pendingSends.push(bytes); } }; - transport.close = function() { - if (isClosed) return; // Already closed - transport._closedByUser = true; + transport.close = function(intentional) { + if (isClosed) return; + transport._closedByUser = !!intentional; cleanup(); appConnection.sendServiceMessage(serviceId, instanceId, proto.ServiceMessageKind.eDisconnect); }; @@ -1321,6 +1580,36 @@ studio.internal = (function(proto) { servicesTimeoutId = null; } + // Stall detection: force-close socket if no server messages for STALL_TIMEOUT_MS + // while there are active subscriptions expecting data. Without subscriptions, + // silence is expected and not a stall. + function hasAnyActiveSubscriptions() { + for (var node of nodeMap.values()) { + if (node.hasSubscriptions()) return true; + } + return false; + } + + function startStallDetection() { + if (stallCheckIntervalId) return; + lastServerMessageTime = Date.now(); + stallCheckIntervalId = setInterval(function() { + if (lastServerMessageTime > 0 && hasAnyActiveSubscriptions() && + Date.now() - lastServerMessageTime > STALL_TIMEOUT_MS) { + console.log("Connection stalled: no server messages for " + STALL_TIMEOUT_MS + "ms with active subscriptions, forcing reconnect"); + stopStallDetection(); + socketTransport.close(); + } + }, STALL_CHECK_INTERVAL_MS); + } + + function stopStallDetection() { + clearInterval(stallCheckIntervalId); + stallCheckIntervalId = null; + } + + this._startStallDetection = startStallDetection; + function cleanupPrimaryConnectionState() { if (!isPrimaryConnection) return; // Notify service instances of disconnect @@ -1336,6 +1625,7 @@ studio.internal = (function(proto) { availableServices.clear(); currentMetadata = null; requests = []; + stopStallDetection(); } this.onServicesReceived = function(services, metadata) { @@ -1419,11 +1709,11 @@ studio.internal = (function(proto) { if (existingConnection) { // Reconnect existing transport with new service instance — preserves nodes and callbacks proxyConnection = existingConnection; - proxyConnection.serviceId = newServiceId; var transport = proxyConnection._getTransport(); // Set onopen to trigger handler recreation + resubscribe (the original onopen // was overwritten by the first connectViaProxy call's new-connection handler) transport.onopen = function() { + proxyConnection._startStallDetection(); proxyConnection._triggerReconnect(); }; // Transport.reconnect() allocates a new instance and fires onopen when connected @@ -1436,7 +1726,6 @@ studio.internal = (function(proto) { proxyConnection = new obj.AppConnection(result.transport, notificationListener, autoConnect); proxyConnection.instanceKey = result.instanceKey; proxyConnection.siblingKey = addr + ':' + port; - proxyConnection.serviceId = newServiceId; serviceConnections.set(result.instanceKey, proxyConnection); } @@ -1454,6 +1743,7 @@ studio.internal = (function(proto) { if (!existingConnection) { var transport = proxyConnection._getTransport(); transport.onopen = function() { + proxyConnection._startStallDetection(); if (appConnection.onServiceConnectionEstablished) { appConnection.onServiceConnectionEstablished(proxyConnection, proxyConnection.instanceKey); } @@ -1502,7 +1792,7 @@ studio.internal = (function(proto) { this.sendServiceMessage = function(serviceId, instanceId, kind, payload) { var serviceMessage = proto.ServiceMessage.create({ serviceId: serviceId, - instanceId: instanceId || 0, + instanceId: instanceId, kind: kind }); if (payload) { @@ -1517,6 +1807,8 @@ studio.internal = (function(proto) { // Returns true if proxy protocol is supported (compat >= PROXY_MIN_COMPAT_VERSION) // When true, backends are accessed via ServiceMessage tunneling, not direct connections this.supportsProxyProtocol = function() { + if (typeof process !== 'undefined' && process.env && process.env.CDP_FORCE_DIRECT_MODE === '1') + return false; return currentMetadata && currentMetadata.compatVersion >= PROXY_MIN_COMPAT_VERSION; }; @@ -1531,10 +1823,22 @@ studio.internal = (function(proto) { // Clear any pending reconnect timeout since we're now connected clearTimeout(reconnectTimeoutId); reconnectTimeoutId = null; + reconnectDelayMs = INITIAL_RECONNECT_DELAY_MS; + hasNotifiedDisconnect = false; // Reset disconnect guard for next cycle + startStallDetection(); // Note: For proxy connections, connectViaProxy overwrites transport.onopen - // with _triggerReconnect(), so this handler only fires for the primary connection. + // and calls _startStallDetection() there instead. // Primary handler recreation happens in scheduleReconnect before reconnect(). appConnection.resubscribe(systemNode); + // Notify lifecycle callback after structure refetch completes (not on initial connect) + if (hasConnectedBefore && appConnection.onReconnected) { + systemNode.async.onDone(function() { + appConnection.onReconnected(); + }, function(err) { + console.error("Structure refetch failed on reconnect:", err); + }, systemNode); + } + hasConnectedBefore = true; }; onClosed = function (event) { if (closedIntentionally) return; @@ -1572,6 +1876,13 @@ studio.internal = (function(proto) { console.log("Socket close: " + reason); + // Notify lifecycle callback once per disconnect (not on each reconnection attempt) + if (!hasNotifiedDisconnect && appConnection.onDisconnected) { + hasNotifiedDisconnect = true; + appConnection.onDisconnected(); + } + + stopStallDetection(); clearServicesTimeout(); clearTimeout(reconnectTimeoutId); reconnectTimeoutId = null; @@ -1638,6 +1949,8 @@ studio.internal = (function(proto) { } if (stop) { request.stop = stop; + } else { + request.inactivityResendInterval = INACTIVITY_RESEND_INTERVAL_S; } msg.messageType = proto.ContainerType.eGetterRequest; msg.getterRequest = [request]; @@ -1650,6 +1963,8 @@ studio.internal = (function(proto) { request.nodeId = id; if (stop) { request.stop = stop; + } else { + request.inactivityResendInterval = INACTIVITY_RESEND_INTERVAL_S; } if (startingFrom != undefined) { request.startingFrom = startingFrom; @@ -1847,6 +2162,7 @@ studio.internal = (function(proto) { } function handleIncomingContainer(protoContainer, metadata) { + lastServerMessageTime = Date.now(); // Update stall detection timestamp // Set currentMetadata from Hello message immediately (not just on ServicesNotification) // This ensures supportsProxyProtocol() works before ServicesNotification arrives if (!currentMetadata && metadata) { @@ -1913,20 +2229,17 @@ studio.internal = (function(proto) { * For primary connections, this closes the WebSocket. */ this.close = function() { - // Mark as intentionally closed to prevent reconnection attempts closedIntentionally = true; - // Clear pending reconnect to prevent accessing null socketTransport + stopStallDetection(); clearTimeout(reconnectTimeoutId); reconnectTimeoutId = null; - // Clear services timeout to prevent timer firing after close clearServicesTimeout(); - // Reset auth state to prevent stale state on reconnect reauthRequestPending = false; cleanupPrimaryConnectionState(); if (socketTransport) { var transport = socketTransport; socketTransport = null; // Guard against double-close - transport.close(); + transport.close(true); } }; }; @@ -2118,7 +2431,7 @@ studio.api = (function(internal) { * * @callback structureConsumer * @param {string} node name - * @param {number} REMOVE 0/ADD 1 from studio.api.structure.ADD / studio.api.structure.REMOVE + * @param {number} change - ADD (1), REMOVE (0), or RECONNECT (2) from studio.api.structure */ /** @@ -2234,7 +2547,7 @@ studio.api = (function(internal) { var findNodeCacheInvalidator = null; // Set after findNodeCache is created var system = new internal.SystemNode(studioURL, notificationListener, function(appName) { - // Called when app structure changes (ADD or REMOVE) + // Called on app structure changes (ADD, REMOVE, or RECONNECT) findNodeCacheInvalidator(appName); }); @@ -2263,7 +2576,7 @@ studio.api = (function(internal) { var nodes = {}; function f(promise, nodeName, index, arr) { - var path = arr.slice(0,index+1); + var path = arr.slice(0,index+1).join('.'); if (memoize[path] && !nodes[path]) return memoize[path]; else if (memoize[path] && nodes[path] && nodes[path].isValid()) @@ -2284,8 +2597,7 @@ studio.api = (function(internal) { // Invalidate cache entries for a specific app name (called on structure changes) f.invalidateApp = function(appName) { Object.keys(memoize).forEach(function(key) { - // Key is array converted to string, e.g. "App2" or "App2,CPULoad" - if (key === appName || key.startsWith(appName + ',')) { + if (key === appName || key.startsWith(appName + '.')) { delete memoize[key]; delete nodes[key]; } @@ -2297,14 +2609,46 @@ studio.api = (function(internal) { var findNode = findNodeCache; findNodeCacheInvalidator = findNodeCache.invalidateApp; /** - * Request node with provided path. + * Request node with provided path. Waits indefinitely for the target app + * to appear if it is not yet available. * - * @param nodePath Should contain dot separated path to target node. Note: root node is not considered part of the path. + * @param nodePath Dot-separated path to target node (e.g. 'App2.CPULoad'). + * @param options Optional. { timeout: milliseconds } to limit wait time. + * Use { timeout: 0 } to fail immediately if the app is not available. * @returns {Promise.} A promise containing requested node when fulfilled. */ - this.find = function(nodePath) { - var nodes = nodePath.split("."); - return nodes.reduce(findNode, this.root()); + this.find = function(nodePath, options) { + if (!nodePath) { + return Promise.reject("Child not found: "); + } + var pathParts = nodePath.split("."); + var appName = pathParts[0]; + var self = this; + + function doFind() { + // Reject immediately if the app was previously seen but is now disconnected. + if (system.wasAppSeen(appName) && !system.isAppAvailable(appName)) { + return Promise.reject(appName + " is not available"); + } + return pathParts.reduce(findNode, self.root()); + } + + // timeout: 0 means immediate fail (old behavior) + if (options && options.timeout === 0) { + return doFind(); + } + + // timeout > 0 means wait up to that many ms; no timeout means wait indefinitely + var timeoutMs = (options && options.timeout > 0) ? options.timeout : 0; + + // Trigger connection before waiting — prevents deadlock when find() + // is called without a prior root() call. Surface connection errors + // to the caller instead of swallowing them. + return self.root().then(function() { + return system.waitForApp(appName, timeoutMs); + }).then(function() { + return doFind(); + }); }; this._getAppConnections = function() { diff --git a/package.json b/package.json index 1dccfa9..ac80f85 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "cdp-client", - "version": "2.5.0", + "version": "3.0.0", "description": "A simple Javascript interface for the CDP Studio development platform that allows Javascript applications to interact with", "main": "index.js", "scripts": { diff --git a/test/duplicate-values-reconnect.test.js b/test/duplicate-values-reconnect.test.js index 233ef3a..1c7833b 100644 --- a/test/duplicate-values-reconnect.test.js +++ b/test/duplicate-values-reconnect.test.js @@ -521,9 +521,7 @@ describe('Value Delivery', () => { expect(systemNode.lastValue()).toBe(43.5); }); - test('calling receiveValue twice with same timestamp should still call callback twice (no dedup)', () => { - // This tests that receiveValue is a simple passthrough - it doesn't deduplicate - // Deduplication should happen at a higher level if needed + test('calling receiveValue twice with same value and increasing timestamps delivers both', () => { const transport = new FakeTransport(); const app = new internal.AppConnection(transport, null, false); @@ -532,12 +530,17 @@ describe('Value Delivery', () => { app.root().async.subscribeToValues(valueConsumer, 5, 0); - // Intentionally send same value twice (simulates potential duplicate from reconnect) app.root().receiveValue(100, 1000); - app.root().receiveValue(100, 1000); // Same timestamp! + app.root().receiveValue(100, 1001); // same value, newer timestamp — delivered + expect(callbackCount.count).toBe(2); - // This tests current behavior - if the implementation SHOULD deduplicate, this test documents it doesn't + // Same value with OLDER timestamp — filtered (reconnect replay) + app.root().receiveValue(100, 999); expect(callbackCount.count).toBe(2); + + // Different value with SAME timestamp as last — delivered (not filtered) + app.root().receiveValue(200, 1001); + expect(callbackCount.count).toBe(3); }); }); diff --git a/test/find-and-structure-events.test.js b/test/find-and-structure-events.test.js new file mode 100644 index 0000000..ff2df8f --- /dev/null +++ b/test/find-and-structure-events.test.js @@ -0,0 +1,68 @@ +/** + * find() wait semantics and RECONNECT structure event tests + * + * Unit tests for the new public API surfaces. Tests that require + * a live connection (find() timeout behavior, subscribeToStructure + * RECONNECT events) are covered by the component tests in the + * parent cdp monorepo. + */ + +global.WebSocket = require('ws'); +const studio = require('../index'); + +describe('RECONNECT structure constant', () => { + test('studio.api.structure has ADD, REMOVE, and RECONNECT with correct values', () => { + expect(studio.api.structure.ADD).toBe(1); + expect(studio.api.structure.REMOVE).toBe(0); + expect(studio.api.structure.RECONNECT).toBe(2); + }); + + test('RECONNECT is distinct from ADD and REMOVE', () => { + const values = [studio.api.structure.ADD, studio.api.structure.REMOVE, studio.api.structure.RECONNECT]; + expect(new Set(values).size).toBe(3); + }); +}); + +describe('find() input validation', () => { + test('find() with empty path rejects', async () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + await expect(client.find('')).rejects.toContain('not found'); + client.close(); + }); + + test('find() with undefined path rejects', async () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + await expect(client.find()).rejects.toContain('not found'); + client.close(); + }); +}); + +describe('close() is terminal', () => { + test('close() can be called multiple times without error', () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + expect(() => { + client.close(); + client.close(); + }).not.toThrow(); + }); + + test('root() after close() rejects with closed error', async () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + client.close(); + await expect(client.root()).rejects.toThrow(); + }); + + test('find() after close() rejects with closed error', async () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + client.close(); + await expect(client.find('App.Signal')).rejects.toThrow(); + }); + + test('close() rejects pending find() waiters', async () => { + const client = new studio.api.Client('ws://127.0.0.1:1', {}, false); + const findPromise = client.find('NonExistentApp.Signal'); + await new Promise(r => setTimeout(r, 50)); + client.close(); + await expect(findPromise).rejects.toThrow(); + }); +});