refactor(consensus): Move Insert Payload Off Task Queue#2533
refactor(consensus): Move Insert Payload Off Task Queue#2533refcell wants to merge 4 commits intorf/refactor/direct-engine-build-payloadfrom
Conversation
Move unsafe payload insertion into direct Engine methods and delete the InsertTask wrapper. Keep consolidation, delegated forkchoice, finalization, and seal on the existing queue. Co-authored-by: Codex <noreply@openai.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
Apply nightly rustfmt import grouping and collapse the insert acknowledgement send check for clippy. Co-authored-by: Codex <noreply@openai.com>
Propagate local no-ack insert errors through the existing engine task severity handler. Avoid redundant V3/V4 insert payload clones and clarify the newPayload duration log field. Co-authored-by: Codex <noreply@openai.com>
Reshape the unsafe-payload processor rstest table around a case struct so the generated test does not trip clippy's argument limit. Correct the service README's description of when the local insert acknowledgement is sent versus when the sequencer client waits on the unsafe-head watch channel. Co-authored-by: Codex <noreply@openai.com>
| /// Inserts a payload and retries temporary failures. | ||
| pub async fn insert_payload_with_retry( | ||
| &mut self, | ||
| client: Arc<EngineClient_>, | ||
| config: Arc<RollupConfig>, | ||
| envelope: BaseExecutionPayloadEnvelope, | ||
| payload_safety: InsertPayloadSafety, | ||
| ) -> InsertTaskResult { | ||
| let _task_timer = | ||
| base_metrics::timed!(Metrics::engine_task_duration(Metrics::INSERT_TASK_LABEL)); | ||
|
|
||
| loop { | ||
| match Self::insert_payload_with_state( | ||
| &mut self.state, | ||
| Arc::clone(&client), | ||
| Arc::clone(&config), | ||
| envelope.clone(), | ||
| payload_safety, | ||
| false, | ||
| ) | ||
| .await | ||
| { | ||
| Ok(inserted_head) => { | ||
| self.state_sender.send_replace(self.state); | ||
| Metrics::engine_task_count(Metrics::INSERT_TASK_LABEL).increment(1); | ||
| return Ok(inserted_head); | ||
| } | ||
| Err(err) => { | ||
| let severity = err.severity(); | ||
| Metrics::engine_task_failure(Metrics::INSERT_TASK_LABEL, severity.as_label()) | ||
| .increment(1); | ||
|
|
||
| match severity { | ||
| EngineTaskErrorSeverity::Temporary => { | ||
| trace!(target: "engine", error = %err, "Temporary engine error"); | ||
| yield_now().await; | ||
| } | ||
| EngineTaskErrorSeverity::Critical => { | ||
| error!(target: "engine", error = %err, "Critical engine error"); | ||
| return Err(err); | ||
| } | ||
| EngineTaskErrorSeverity::Reset => { | ||
| warn!(target: "engine", "Engine requested derivation reset"); | ||
| return Err(err); | ||
| } | ||
| EngineTaskErrorSeverity::Flush => { | ||
| warn!(target: "engine", "Engine requested derivation flush"); | ||
| return Err(err); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
insert_payload_with_retry holds &mut self (exclusive access to Engine) for the entire retry loop. Unlike the old InsertTask on the task queue — where the engine's drain() loop could process other higher-priority tasks between retries — this loop blocks all other engine operations (enqueue, drain, build, get_payload, reset) until the insert either succeeds or hits a non-temporary error.
If the EL returns Temporary errors persistently (e.g. reth is temporarily overloaded), this stalls all engine activity. The yield_now() only yields the tokio task, not Engine exclusivity — the caller in EngineProcessor::start() cannot process any new requests from the request_channel until this method returns.
This may be intentional given the PR's goal of taking inserts off the task queue, but it's a behavioral change from the old architecture worth confirming. In the old design, a Temporary insert failure would cause drain() to return, letting the processor's main loop process new requests (builds, consolidations, etc.) before retrying on the next iteration.
| let time_start = Instant::now(); | ||
| let BaseExecutionPayloadEnvelope { parent_beacon_block_root, execution_payload } = envelope; | ||
| let parent_beacon_block_root = parent_beacon_block_root.unwrap_or_default(); | ||
| let new_payload_start = Instant::now(); |
There was a problem hiding this comment.
time_start and new_payload_start are assigned on consecutive lines and will always have nearly identical values. The distinction between total_duration and new_payload_duration logged at line 410-411 is misleading — total_duration includes the SynchronizeTask FCU but is offset from new_payload_duration by only a few nanoseconds of destructuring overhead.
This is pre-existing from the old InsertTask, but since this method is being refactored anyway, consider simplifying to a single timer or renaming for clarity (e.g. new_payload_rpc_duration vs total_insert_duration).
| let (response, block): (_, BaseBlock) = match execution_payload { | ||
| BaseExecutionPayload::V1(payload) => { | ||
| let block = BaseExecutionPayload::V1(payload.clone()) | ||
| .try_into_block() | ||
| .map_err(InsertTaskError::FromBlockError)?; | ||
| let payload_input = | ||
| ExecutionPayloadInputV2 { execution_payload: payload, withdrawals: None }; | ||
| (client.new_payload_v2(payload_input).await, block) | ||
| } | ||
| BaseExecutionPayload::V2(payload) => { | ||
| let block = BaseExecutionPayload::V2(payload.clone()) | ||
| .try_into_block() | ||
| .map_err(InsertTaskError::FromBlockError)?; | ||
| let payload_input = ExecutionPayloadInputV2 { | ||
| execution_payload: payload.payload_inner, | ||
| withdrawals: Some(payload.withdrawals), | ||
| }; | ||
| (client.new_payload_v2(payload_input).await, block) | ||
| } | ||
| BaseExecutionPayload::V3(payload) => { | ||
| let block = BaseExecutionPayload::V3(payload.clone()) | ||
| .try_into_block_with_sidecar(&BaseExecutionPayloadSidecar::v3( | ||
| CancunPayloadFields::new(parent_beacon_block_root, vec![]), | ||
| )) | ||
| .map_err(InsertTaskError::FromBlockError)?; | ||
| (client.new_payload_v3(payload, parent_beacon_block_root).await, block) | ||
| } | ||
| BaseExecutionPayload::V4(payload) => { | ||
| let block = BaseExecutionPayload::V4(payload.clone()) | ||
| .try_into_block_with_sidecar(&BaseExecutionPayloadSidecar::v4( | ||
| CancunPayloadFields::new(parent_beacon_block_root, vec![]), | ||
| PraguePayloadFields::new(EMPTY_REQUESTS_HASH), | ||
| )) | ||
| .map_err(InsertTaskError::FromBlockError)?; | ||
| (client.new_payload_v4(payload, parent_beacon_block_root).await, block) | ||
| } | ||
| }; |
There was a problem hiding this comment.
In the V1 and V2 arms, the payload is cloned to build the block, then the original is moved into the RPC input — one clone per arm, which is fine. But in the V3 and V4 arms, payload is already an owned value from the match destructure, yet .clone() is called to build the block (lines 347, 355), and then the original is consumed by the RPC call. This means V3/V4 payloads are cloned once here unnecessarily — the block could be built first from a reference or the RPC input could be constructed from the block.
Since insert_payload_with_retry already clones envelope on every retry iteration (line 275), minimizing per-call clones of full transaction lists is worthwhile. Consider building the block from a borrow of payload before consuming it:
BaseExecutionPayload::V3(payload) => {
let sidecar = BaseExecutionPayloadSidecar::v3(
CancunPayloadFields::new(parent_beacon_block_root, vec![]),
);
let block = BaseExecutionPayload::V3(payload.clone())
.try_into_block_with_sidecar(&sidecar)
.map_err(InsertTaskError::FromBlockError)?;
(client.new_payload_v3(payload, parent_beacon_block_root).await, block)
}Or restructure so the block conversion happens before the owned payload is consumed by the RPC call, avoiding the clone entirely if try_into_block_with_sidecar can take &self.
Review SummaryThe refactoring moves payload insertion from the Findings
|
Summary
This moves unsafe payload insertion out of the engine task queue and onto direct Engine methods. The remaining queue now only owns seal, consolidate, delegated forkchoice, and finalize work, while sequencer insert acknowledgement still waits for the inserted unsafe head. External unsafe inserts keep the existing retry and reset/flush handling through the processor's task-error path.