Skip to content

Commit 31ecd39

Browse files
waleedlatif1claude
andcommitted
improvement(executor): correctness-by-construction for workflow logs
Replace the post-hoc reconciliation layer with a deterministic emission protocol: drain pending callback promises at terminal boundaries, mint a per-invocation blockExecutionId, and key console entries by that ID. Eliminates races between block:* and execution:* events without changing per-block latency. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent a9c12a2 commit 31ecd39

12 files changed

Lines changed: 229 additions & 424 deletions

File tree

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,8 @@ async function handleExecutePost(
919919
blockType: string,
920920
executionOrder: number,
921921
iterationContext?: IterationContext,
922-
childWorkflowContext?: ChildWorkflowContext
922+
childWorkflowContext?: ChildWorkflowContext,
923+
blockExecutionId?: string
923924
) => {
924925
reqLogger.info('onBlockStart called', { blockId, blockName, blockType })
925926
sendEvent({
@@ -945,6 +946,7 @@ async function handleExecutePost(
945946
childWorkflowBlockId: childWorkflowContext.parentBlockId,
946947
childWorkflowName: childWorkflowContext.workflowName,
947948
}),
949+
...(blockExecutionId && { blockExecutionId }),
948950
},
949951
})
950952
}
@@ -955,7 +957,8 @@ async function handleExecutePost(
955957
blockType: string,
956958
callbackData: any,
957959
iterationContext?: IterationContext,
958-
childWorkflowContext?: ChildWorkflowContext
960+
childWorkflowContext?: ChildWorkflowContext,
961+
blockExecutionId?: string
959962
) => {
960963
const hasError = callbackData.output?.error
961964
const childWorkflowData = childWorkflowContext
@@ -969,6 +972,11 @@ async function handleExecutePost(
969972
? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId }
970973
: {}
971974

975+
const resolvedBlockExecutionId = blockExecutionId ?? callbackData.blockExecutionId
976+
const blockExecData = resolvedBlockExecutionId
977+
? { blockExecutionId: resolvedBlockExecutionId }
978+
: {}
979+
972980
if (hasError) {
973981
reqLogger.info('onBlockComplete (error) called', {
974982
blockId,
@@ -1002,6 +1010,7 @@ async function handleExecutePost(
10021010
}),
10031011
...childWorkflowData,
10041012
...instanceData,
1013+
...blockExecData,
10051014
},
10061015
})
10071016
} else {
@@ -1036,6 +1045,7 @@ async function handleExecutePost(
10361045
}),
10371046
...childWorkflowData,
10381047
...instanceData,
1048+
...blockExecData,
10391049
},
10401050
})
10411051
}
@@ -1165,7 +1175,6 @@ async function handleExecutePost(
11651175
data: {
11661176
error: timeoutErrorMessage,
11671177
duration: result.metadata?.duration || 0,
1168-
finalBlockLogs: result.logs,
11691178
},
11701179
})
11711180
finalMetaStatus = 'error'
@@ -1179,7 +1188,6 @@ async function handleExecutePost(
11791188
workflowId,
11801189
data: {
11811190
duration: result.metadata?.duration || 0,
1182-
finalBlockLogs: result.logs,
11831191
},
11841192
})
11851193
finalMetaStatus = 'cancelled'
@@ -1244,7 +1252,6 @@ async function handleExecutePost(
12441252
data: {
12451253
error: executionResult?.error || errorMessage,
12461254
duration: executionResult?.metadata?.duration || 0,
1247-
finalBlockLogs: executionResult?.logs,
12481255
},
12491256
})
12501257
finalMetaStatus = 'error'

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -218,31 +218,25 @@ export function useWorkflowExecution() {
218218
durationMs?: number
219219
blockLogs: BlockLog[]
220220
isPreExecutionError?: boolean
221-
finalBlockLogs?: BlockLog[]
222221
}) => {
223222
if (!params.workflowId) return
224223
sharedHandleExecutionErrorConsole(
225-
{ addConsole, updateConsole, cancelRunningEntries },
224+
{ addConsole, updateConsole },
226225
{ ...params, workflowId: params.workflowId }
227226
)
228227
},
229-
[addConsole, cancelRunningEntries, updateConsole]
228+
[addConsole, updateConsole]
230229
)
231230

232231
const handleExecutionCancelledConsole = useCallback(
233-
(params: {
234-
workflowId?: string
235-
executionId?: string
236-
durationMs?: number
237-
finalBlockLogs?: BlockLog[]
238-
}) => {
232+
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
239233
if (!params.workflowId) return
240234
sharedHandleExecutionCancelledConsole(
241-
{ addConsole, updateConsole, cancelRunningEntries },
235+
{ addConsole, updateConsole },
242236
{ ...params, workflowId: params.workflowId }
243237
)
244238
},
245-
[addConsole, cancelRunningEntries, updateConsole]
239+
[addConsole, updateConsole]
246240
)
247241

248242
const buildBlockEventHandlers = useCallback(
@@ -1024,8 +1018,6 @@ export function useWorkflowExecution() {
10241018
accumulatedBlockLogs,
10251019
accumulatedBlockStates,
10261020
executedBlockIds,
1027-
consoleMode: 'update',
1028-
includeStartConsoleEntry: true,
10291021
onBlockCompleteCallback: onBlockComplete,
10301022
})
10311023

@@ -1228,7 +1220,6 @@ export function useWorkflowExecution() {
12281220
durationMs: data.duration,
12291221
blockLogs: accumulatedBlockLogs,
12301222
isPreExecutionError,
1231-
finalBlockLogs: data.finalBlockLogs,
12321223
})
12331224

12341225
if (activeWorkflowId && !isExecutingFromChat) {
@@ -1255,7 +1246,6 @@ export function useWorkflowExecution() {
12551246
workflowId: activeWorkflowId,
12561247
executionId: executionIdRef.current,
12571248
durationMs: data?.duration,
1258-
finalBlockLogs: data?.finalBlockLogs,
12591249
})
12601250

12611251
if (activeWorkflowId && !isExecutingFromChat) {
@@ -1672,8 +1662,6 @@ export function useWorkflowExecution() {
16721662
accumulatedBlockLogs,
16731663
accumulatedBlockStates,
16741664
executedBlockIds,
1675-
consoleMode: 'update',
1676-
includeStartConsoleEntry: true,
16771665
})
16781666

16791667
await executionStream.executeFromBlock({
@@ -1743,7 +1731,6 @@ export function useWorkflowExecution() {
17431731
error: data.error,
17441732
durationMs: data.duration,
17451733
blockLogs: accumulatedBlockLogs,
1746-
finalBlockLogs: data.finalBlockLogs,
17471734
})
17481735

17491736
setCurrentExecutionId(workflowId, null)
@@ -1756,7 +1743,6 @@ export function useWorkflowExecution() {
17561743
workflowId,
17571744
executionId: executionIdRef.current,
17581745
durationMs: data?.duration,
1759-
finalBlockLogs: data?.finalBlockLogs,
17601746
})
17611747

17621748
setCurrentExecutionId(workflowId, null)
@@ -1903,8 +1889,6 @@ export function useWorkflowExecution() {
19031889
accumulatedBlockLogs,
19041890
accumulatedBlockStates,
19051891
executedBlockIds,
1906-
consoleMode: 'update',
1907-
includeStartConsoleEntry: true,
19081892
})
19091893

19101894
const capturedExecutionId = executionId
@@ -2005,7 +1989,6 @@ export function useWorkflowExecution() {
20051989
executionId: capturedExecutionId,
20061990
error: data.error,
20071991
blockLogs: accumulatedBlockLogs,
2008-
finalBlockLogs: data.finalBlockLogs,
20091992
})
20101993
},
20111994
onExecutionCancelled: (data) => {
@@ -2026,7 +2009,6 @@ export function useWorkflowExecution() {
20262009
workflowId: reconnectWorkflowId,
20272010
executionId: capturedExecutionId,
20282011
durationMs: data?.duration,
2029-
finalBlockLogs: data?.finalBlockLogs,
20302012
})
20312013
},
20322014
},

0 commit comments

Comments
 (0)