Skip to content

refactor(consensus): Move Insert Payload Off Task Queue#2533

Draft
refcell wants to merge 4 commits intorf/refactor/direct-engine-build-payloadfrom
rf/refactor/direct-engine-insert
Draft

refactor(consensus): Move Insert Payload Off Task Queue#2533
refcell wants to merge 4 commits intorf/refactor/direct-engine-build-payloadfrom
rf/refactor/direct-engine-insert

Conversation

@refcell
Copy link
Copy Markdown
Contributor

@refcell refcell commented May 5, 2026

Summary

This moves unsafe payload insertion out of the engine task queue and onto direct Engine methods. The remaining queue now only owns seal, consolidate, delegated forkchoice, and finalize work, while sequencer insert acknowledgement still waits for the inserted unsafe head. External unsafe inserts keep the existing retry and reset/flush handling through the processor's task-error path.

Move unsafe payload insertion into direct Engine methods and delete the InsertTask wrapper. Keep consolidation, delegated forkchoice, finalization, and seal on the existing queue.

Co-authored-by: Codex <noreply@openai.com>
@vercel
Copy link
Copy Markdown

vercel Bot commented May 5, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
base Ignored Ignored Preview May 5, 2026 4:48pm

Request Review

Comment thread crates/consensus/engine/src/task_queue/core.rs
Comment thread crates/consensus/engine/src/task_queue/core.rs Outdated
Apply nightly rustfmt import grouping and collapse the insert acknowledgement send check for clippy.

Co-authored-by: Codex <noreply@openai.com>
@refcell refcell added consensus Area: consensus stacked Meta: Stacked PR labels May 5, 2026
@refcell refcell self-assigned this May 5, 2026
Comment thread crates/consensus/engine/src/task_queue/core.rs Outdated
refcell and others added 2 commits May 5, 2026 12:36
Propagate local no-ack insert errors through the existing engine task severity handler. Avoid redundant V3/V4 insert payload clones and clarify the newPayload duration log field.

Co-authored-by: Codex <noreply@openai.com>
Reshape the unsafe-payload processor rstest table around a case struct so the generated test does not trip clippy's argument limit. Correct the service README's description of when the local insert acknowledgement is sent versus when the sequencer client waits on the unsafe-head watch channel.

Co-authored-by: Codex <noreply@openai.com>
Comment on lines +259 to +312
/// Inserts a payload and retries temporary failures.
pub async fn insert_payload_with_retry(
&mut self,
client: Arc<EngineClient_>,
config: Arc<RollupConfig>,
envelope: BaseExecutionPayloadEnvelope,
payload_safety: InsertPayloadSafety,
) -> InsertTaskResult {
let _task_timer =
base_metrics::timed!(Metrics::engine_task_duration(Metrics::INSERT_TASK_LABEL));

loop {
match Self::insert_payload_with_state(
&mut self.state,
Arc::clone(&client),
Arc::clone(&config),
envelope.clone(),
payload_safety,
false,
)
.await
{
Ok(inserted_head) => {
self.state_sender.send_replace(self.state);
Metrics::engine_task_count(Metrics::INSERT_TASK_LABEL).increment(1);
return Ok(inserted_head);
}
Err(err) => {
let severity = err.severity();
Metrics::engine_task_failure(Metrics::INSERT_TASK_LABEL, severity.as_label())
.increment(1);

match severity {
EngineTaskErrorSeverity::Temporary => {
trace!(target: "engine", error = %err, "Temporary engine error");
yield_now().await;
}
EngineTaskErrorSeverity::Critical => {
error!(target: "engine", error = %err, "Critical engine error");
return Err(err);
}
EngineTaskErrorSeverity::Reset => {
warn!(target: "engine", "Engine requested derivation reset");
return Err(err);
}
EngineTaskErrorSeverity::Flush => {
warn!(target: "engine", "Engine requested derivation flush");
return Err(err);
}
}
}
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insert_payload_with_retry holds &mut self (exclusive access to Engine) for the entire retry loop. Unlike the old InsertTask on the task queue — where the engine's drain() loop could process other higher-priority tasks between retries — this loop blocks all other engine operations (enqueue, drain, build, get_payload, reset) until the insert either succeeds or hits a non-temporary error.

If the EL returns Temporary errors persistently (e.g. reth is temporarily overloaded), this stalls all engine activity. The yield_now() only yields the tokio task, not Engine exclusivity — the caller in EngineProcessor::start() cannot process any new requests from the request_channel until this method returns.

This may be intentional given the PR's goal of taking inserts off the task queue, but it's a behavioral change from the old architecture worth confirming. In the old design, a Temporary insert failure would cause drain() to return, letting the processor's main loop process new requests (builds, consolidations, etc.) before retrying on the next iteration.

Comment on lines +323 to +326
let time_start = Instant::now();
let BaseExecutionPayloadEnvelope { parent_beacon_block_root, execution_payload } = envelope;
let parent_beacon_block_root = parent_beacon_block_root.unwrap_or_default();
let new_payload_start = Instant::now();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time_start and new_payload_start are assigned on consecutive lines and will always have nearly identical values. The distinction between total_duration and new_payload_duration logged at line 410-411 is misleading — total_duration includes the SynchronizeTask FCU but is offset from new_payload_duration by only a few nanoseconds of destructuring overhead.

This is pre-existing from the old InsertTask, but since this method is being refactored anyway, consider simplifying to a single timer or renaming for clarity (e.g. new_payload_rpc_duration vs total_insert_duration).

Comment on lines +327 to +363
let (response, block): (_, BaseBlock) = match execution_payload {
BaseExecutionPayload::V1(payload) => {
let block = BaseExecutionPayload::V1(payload.clone())
.try_into_block()
.map_err(InsertTaskError::FromBlockError)?;
let payload_input =
ExecutionPayloadInputV2 { execution_payload: payload, withdrawals: None };
(client.new_payload_v2(payload_input).await, block)
}
BaseExecutionPayload::V2(payload) => {
let block = BaseExecutionPayload::V2(payload.clone())
.try_into_block()
.map_err(InsertTaskError::FromBlockError)?;
let payload_input = ExecutionPayloadInputV2 {
execution_payload: payload.payload_inner,
withdrawals: Some(payload.withdrawals),
};
(client.new_payload_v2(payload_input).await, block)
}
BaseExecutionPayload::V3(payload) => {
let block = BaseExecutionPayload::V3(payload.clone())
.try_into_block_with_sidecar(&BaseExecutionPayloadSidecar::v3(
CancunPayloadFields::new(parent_beacon_block_root, vec![]),
))
.map_err(InsertTaskError::FromBlockError)?;
(client.new_payload_v3(payload, parent_beacon_block_root).await, block)
}
BaseExecutionPayload::V4(payload) => {
let block = BaseExecutionPayload::V4(payload.clone())
.try_into_block_with_sidecar(&BaseExecutionPayloadSidecar::v4(
CancunPayloadFields::new(parent_beacon_block_root, vec![]),
PraguePayloadFields::new(EMPTY_REQUESTS_HASH),
))
.map_err(InsertTaskError::FromBlockError)?;
(client.new_payload_v4(payload, parent_beacon_block_root).await, block)
}
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the V1 and V2 arms, the payload is cloned to build the block, then the original is moved into the RPC input — one clone per arm, which is fine. But in the V3 and V4 arms, payload is already an owned value from the match destructure, yet .clone() is called to build the block (lines 347, 355), and then the original is consumed by the RPC call. This means V3/V4 payloads are cloned once here unnecessarily — the block could be built first from a reference or the RPC input could be constructed from the block.

Since insert_payload_with_retry already clones envelope on every retry iteration (line 275), minimizing per-call clones of full transaction lists is worthwhile. Consider building the block from a borrow of payload before consuming it:

BaseExecutionPayload::V3(payload) => {
    let sidecar = BaseExecutionPayloadSidecar::v3(
        CancunPayloadFields::new(parent_beacon_block_root, vec![]),
    );
    let block = BaseExecutionPayload::V3(payload.clone())
        .try_into_block_with_sidecar(&sidecar)
        .map_err(InsertTaskError::FromBlockError)?;
    (client.new_payload_v3(payload, parent_beacon_block_root).await, block)
}

Or restructure so the block conversion happens before the owned payload is consumed by the RPC call, avoiding the clone entirely if try_into_block_with_sidecar can take &self.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Review Summary

The refactoring moves payload insertion from the InsertTask task queue paradigm to direct Engine methods (insert_unsafe_payload, insert_local_unsafe_payload, insert_payload_with_retry). The SealTask and build_and_seal utility correctly call insert_payload_with_state directly. Error handling in handle_local_unsafe_l2_block and handle_external_unsafe_l2_block is correctly wired — errors from the None result_tx path are properly routed through handle_engine_task_error. Tests are thorough, covering V1/V2 payload versioning, safe/unsafe head advancement, and new bootstrap role logic.

Findings

  1. insert_payload_with_retry holds &mut Engine for the entire retry loop (core.rs:259-312) — This is a behavioral change from the old architecture. Previously, a Temporary insert failure would cause drain() to return, letting the processor's main loop handle new requests before retrying. Now the exclusive &mut self borrow blocks all engine operations during the retry loop. Worth confirming this doesn't cause stalls under persistent EL temporary errors.

  2. Redundant consecutive timers in insert_payload_with_state (core.rs:323-326) — time_start and new_payload_start are assigned on adjacent lines, making total_duration vs new_payload_duration nearly identical. Pre-existing from the old InsertTask but worth cleaning up since this method is being refactored.

  3. Unnecessary payload clones in V3/V4 arms (core.rs:327-363) — V3/V4 match arms clone the payload to build the block, then consume the original in the RPC call. The clone could be avoided by restructuring the build-before-consume order. Also pre-existing, but relevant since insert_payload_with_retry already clones the envelope on each retry iteration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

consensus Area: consensus stacked Meta: Stacked PR

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant