Skip to content

Commit 6af6598

Browse files
committed
address comments
1 parent 6ccd90c commit 6af6598

8 files changed

Lines changed: 48 additions & 38 deletions

File tree

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import {
2727
} from '@/lib/execution/call-chain'
2828
import {
2929
createExecutionEventWriter,
30-
finalizeExecutionStream,
30+
flushExecutionStreamReplayBuffer,
3131
initializeExecutionStreamMeta,
3232
type TerminalExecutionStreamStatus,
3333
} from '@/lib/execution/event-buffer'
@@ -1296,15 +1296,14 @@ async function handleExecutePost(
12961296
isManualAbortRegistered = false
12971297
}
12981298
if (finalMetaStatus && !terminalEventPublished) {
1299-
const terminalPublished = await finalizeExecutionStream(
1299+
const replayBufferFlushed = await flushExecutionStreamReplayBuffer(
13001300
executionId,
1301-
eventWriter,
1302-
finalMetaStatus
1301+
eventWriter
13031302
)
13041303
reqLogger.error('Failed to publish terminal execution event durably', {
13051304
executionId,
13061305
status: finalMetaStatus,
1307-
replayBufferFlushed: terminalPublished,
1306+
replayBufferFlushed,
13081307
})
13091308
if (!isStreamClosed) {
13101309
controller.error(new Error('Run buffer terminal event publish failed'))

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.test.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ describe('workflow-execution-utils', () => {
427427
executionId: 'exec-1',
428428
executionOrder: 3,
429429
isRunning: true,
430-
childWorkflowBlockId: 'child-inst-1',
430+
childWorkflowBlockId: 'workflow-1',
431431
childWorkflowName: 'Workflow 1',
432432
})
433433
terminalConsoleMockFns.mockAddConsole({
@@ -489,7 +489,7 @@ describe('workflow-execution-utils', () => {
489489
success: true,
490490
isRunning: false,
491491
isCanceled: false,
492-
childWorkflowBlockId: 'child-inst-1',
492+
childWorkflowBlockId: 'workflow-1',
493493
}),
494494
'exec-1',
495495
])
@@ -501,7 +501,7 @@ describe('workflow-execution-utils', () => {
501501
error: 'Request failed',
502502
isRunning: false,
503503
isCanceled: false,
504-
childWorkflowBlockId: 'child-inst-1',
504+
childWorkflowBlockId: 'workflow-1',
505505
}),
506506
'exec-1',
507507
])
@@ -529,7 +529,7 @@ describe('workflow-execution-utils', () => {
529529
iterationCurrent: 0,
530530
iterationType: 'loop',
531531
iterationContainerId: 'loop-1',
532-
childWorkflowBlockId: 'child-inst-1',
532+
childWorkflowBlockId: 'workflow-1',
533533
})
534534
terminalConsoleMockFns.mockAddConsole({
535535
workflowId: 'wf-1',
@@ -542,7 +542,7 @@ describe('workflow-execution-utils', () => {
542542
iterationCurrent: 1,
543543
iterationType: 'loop',
544544
iterationContainerId: 'loop-1',
545-
childWorkflowBlockId: 'child-inst-1',
545+
childWorkflowBlockId: 'workflow-1',
546546
})
547547

548548
const startedAt = new Date().toISOString()
@@ -632,7 +632,7 @@ describe('workflow-execution-utils', () => {
632632
executionId: 'exec-1',
633633
executionOrder: 3,
634634
isRunning: false,
635-
childWorkflowBlockId: 'child-inst-1',
635+
childWorkflowBlockId: 'workflow-1',
636636
childWorkflowInstanceId: 'nested-inst-1',
637637
})
638638
terminalConsoleMockFns.mockAddConsole({
@@ -643,7 +643,7 @@ describe('workflow-execution-utils', () => {
643643
executionId: 'exec-1',
644644
executionOrder: 1,
645645
isRunning: true,
646-
childWorkflowBlockId: 'nested-inst-1',
646+
childWorkflowBlockId: 'nested-workflow',
647647
})
648648

649649
const startedAt = new Date().toISOString()
@@ -688,7 +688,7 @@ describe('workflow-execution-utils', () => {
688688
expect(updateConsole.mock.calls[1]).toEqual([
689689
'nested-api',
690690
expect.objectContaining({
691-
childWorkflowBlockId: 'nested-inst-1',
691+
childWorkflowBlockId: 'nested-workflow',
692692
success: true,
693693
isRunning: false,
694694
isCanceled: false,

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ export function reconcileFinalBlockLogs(
512512
reconcileChildTraceSpans(
513513
updateConsole,
514514
workflowId,
515+
log.blockId,
515516
childWorkflowInstanceId,
516517
executionId,
517518
log.childTraceSpans
@@ -523,20 +524,21 @@ export function reconcileFinalBlockLogs(
523524
function reconcileChildTraceSpans(
524525
updateConsole: UpdateConsoleFn,
525526
workflowId: string,
527+
childWorkflowBlockId: string,
526528
childWorkflowInstanceId: string,
527529
executionId: string,
528530
spans: TraceSpan[]
529531
): void {
530532
for (const span of spans) {
531533
const matchingEntry = span.blockId
532-
? findConsoleEntryForSpan(workflowId, executionId, childWorkflowInstanceId, span)
534+
? findConsoleEntryForSpan(workflowId, executionId, childWorkflowBlockId, span)
533535
: undefined
534536
if (span.blockId) {
535537
const errorMessage = normalizeSpanError(span.output?.error)
536538
updateConsole(
537539
span.blockId,
538540
{
539-
...spanConsoleIdentity(span, childWorkflowInstanceId),
541+
...spanConsoleIdentity(span, childWorkflowBlockId),
540542
replaceOutput: (span.output ?? {}) as Record<string, unknown>,
541543
success: span.status !== 'error',
542544
...(errorMessage !== undefined ? { error: errorMessage } : {}),
@@ -553,6 +555,7 @@ function reconcileChildTraceSpans(
553555
reconcileChildTraceSpans(
554556
updateConsole,
555557
workflowId,
558+
matchingEntry?.blockId ?? childWorkflowBlockId,
556559
matchingEntry?.childWorkflowInstanceId ?? childWorkflowInstanceId,
557560
executionId,
558561
span.children
@@ -561,7 +564,7 @@ function reconcileChildTraceSpans(
561564
}
562565
}
563566

564-
function spanConsoleIdentity(span: TraceSpan, childWorkflowInstanceId: string): ConsoleUpdate {
567+
function spanConsoleIdentity(span: TraceSpan, childWorkflowBlockId: string): ConsoleUpdate {
565568
const iterationContainerId = span.loopId ?? span.parallelId
566569
const iterationType = span.loopId ? 'loop' : span.parallelId ? 'parallel' : undefined
567570
return {
@@ -570,18 +573,18 @@ function spanConsoleIdentity(span: TraceSpan, childWorkflowInstanceId: string):
570573
...(iterationType !== undefined && { iterationType }),
571574
...(iterationContainerId !== undefined && { iterationContainerId }),
572575
...(span.parentIterations !== undefined && { parentIterations: span.parentIterations }),
573-
childWorkflowBlockId: childWorkflowInstanceId,
576+
childWorkflowBlockId,
574577
}
575578
}
576579

577580
function findConsoleEntryForSpan(
578581
workflowId: string,
579582
executionId: string,
580-
childWorkflowInstanceId: string,
583+
childWorkflowBlockId: string,
581584
span: TraceSpan
582585
): ConsoleEntry | undefined {
583586
if (!span.blockId) return undefined
584-
const identity = spanConsoleIdentity(span, childWorkflowInstanceId)
587+
const identity = spanConsoleIdentity(span, childWorkflowBlockId)
585588
return useTerminalConsoleStore
586589
.getState()
587590
.getWorkflowEntries(workflowId)
@@ -615,6 +618,13 @@ function matchesConsoleIdentity(entry: ConsoleEntry, identity: ConsoleUpdate): b
615618
) {
616619
return false
617620
}
621+
if (
622+
identity.childWorkflowInstanceId !== undefined &&
623+
entry.childWorkflowInstanceId !== undefined &&
624+
entry.childWorkflowInstanceId !== identity.childWorkflowInstanceId
625+
) {
626+
return false
627+
}
618628
return true
619629
}
620630

apps/sim/hooks/use-execution-stream.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,8 @@ function isClientDisconnectError(error: any): boolean {
6464
return error.name === 'AbortError'
6565
}
6666

67-
function isAbortDisconnectError(error: any): boolean {
68-
return error.name === 'AbortError'
69-
}
70-
7167
function isRecoverableStreamError(error: any): boolean {
72-
if (isAbortDisconnectError(error)) return false
68+
if (isClientDisconnectError(error)) return false
7369
const msg = (error.message ?? '').toLowerCase()
7470
return (
7571
msg.includes('network error') || msg.includes('failed to fetch') || msg.includes('load failed')
@@ -466,7 +462,7 @@ export function useExecutionStream() {
466462

467463
await processSSEStream(response.body.getReader(), callbacks, 'Reconnect')
468464
} catch (error: any) {
469-
if (isAbortDisconnectError(error)) return
465+
if (isClientDisconnectError(error)) return
470466
logger.error('Reconnection stream error:', error)
471467
throw error
472468
} finally {

apps/sim/lib/execution/event-buffer.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ vi.mock('@/lib/core/config/redis', () => ({
2727

2828
import {
2929
createExecutionEventWriter,
30-
finalizeExecutionStream,
30+
flushExecutionStreamReplayBuffer,
3131
initializeExecutionStreamMeta,
3232
readExecutionEventsState,
3333
} from '@/lib/execution/event-buffer'
@@ -215,7 +215,7 @@ describe('execution event buffer', () => {
215215
await expect(writer.write(makeEvent('lost'))).rejects.toThrow('redis reservation failed')
216216
await writer.write(makeEvent('terminal'))
217217

218-
await expect(finalizeExecutionStream('exec-1', writer, 'complete')).resolves.toBe(true)
218+
await expect(flushExecutionStreamReplayBuffer('exec-1', writer)).resolves.toBe(true)
219219
expect(persistedEntries.map((entry) => entry.eventId)).toEqual([101])
220220
expect(mockRedis.hset).not.toHaveBeenCalledWith(
221221
expect.any(String),
@@ -230,7 +230,7 @@ describe('execution event buffer', () => {
230230
const writer = createExecutionEventWriter('exec-1')
231231
await writer.write(makeEvent('terminal'))
232232

233-
await expect(finalizeExecutionStream('exec-1', writer, 'complete')).resolves.toBe(false)
233+
await expect(flushExecutionStreamReplayBuffer('exec-1', writer)).resolves.toBe(false)
234234
expect(mockRedis.hset).not.toHaveBeenCalled()
235235
})
236236

@@ -269,7 +269,7 @@ describe('execution event buffer', () => {
269269
const writer = createExecutionEventWriter('exec-1')
270270
await writer.write(makeEvent('terminal'))
271271

272-
await expect(finalizeExecutionStream('exec-1', writer, 'complete')).resolves.toBe(true)
272+
await expect(flushExecutionStreamReplayBuffer('exec-1', writer)).resolves.toBe(true)
273273
expect(persistedEntries.map((entry) => entry.eventId)).toEqual([1])
274274
expect(mockRedis.hset).not.toHaveBeenCalledWith(
275275
expect.any(String),

apps/sim/lib/execution/event-buffer.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,9 @@ function createMemoryExecutionEventWriter(executionId: string): ExecutionEventWr
211211
}
212212
}
213213

214-
export async function finalizeExecutionStream(
214+
export async function flushExecutionStreamReplayBuffer(
215215
executionId: string,
216-
writer: ExecutionEventWriter,
217-
status: TerminalExecutionStreamStatus
216+
writer: ExecutionEventWriter
218217
): Promise<boolean> {
219218
let writerClosed = false
220219
for (let attempt = 1; attempt <= FINALIZE_FLUSH_ATTEMPTS; attempt++) {
@@ -227,7 +226,6 @@ export async function finalizeExecutionStream(
227226
} catch (error) {
228227
logger.warn('Failed to flush execution stream replay buffer during finalization', {
229228
executionId,
230-
status,
231229
attempt,
232230
error: toError(error).message,
233231
})

apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import type { Edge } from 'reactflow'
88
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
99
import {
1010
createExecutionEventWriter,
11-
finalizeExecutionStream,
11+
flushExecutionStreamReplayBuffer,
1212
initializeExecutionStreamMeta,
1313
resetExecutionStreamBuffer,
1414
type TerminalExecutionStreamStatus,
@@ -1282,15 +1282,14 @@ export class PauseResumeManager {
12821282
} finally {
12831283
timeoutController?.cleanup()
12841284
if (!terminalEventPublished) {
1285-
const terminalPublished = await finalizeExecutionStream(
1285+
const replayBufferFlushed = await flushExecutionStreamReplayBuffer(
12861286
resumeExecutionId,
1287-
eventWriter,
1288-
finalMetaStatus
1287+
eventWriter
12891288
)
12901289
logger.warn('Failed to publish resume terminal event durably', {
12911290
resumeExecutionId,
12921291
status: finalMetaStatus,
1293-
replayBufferFlushed: terminalPublished,
1292+
replayBufferFlushed,
12941293
})
12951294
if (!executionError) {
12961295
executionError = new Error(TERMINAL_PUBLISH_ERROR)

apps/sim/stores/terminal/console/store.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ const matchesEntryForUpdate = (
110110
return false
111111
}
112112

113+
if (
114+
update.childWorkflowInstanceId !== undefined &&
115+
entry.childWorkflowInstanceId !== undefined &&
116+
entry.childWorkflowInstanceId !== update.childWorkflowInstanceId
117+
) {
118+
return false
119+
}
120+
113121
return true
114122
}
115123

0 commit comments

Comments
 (0)