Skip to content

Commit 7c58ea8

Browse files
authored
remote source startup refactor (#489)
1 parent 41ca6c3 commit 7c58ea8

6 files changed

Lines changed: 158 additions & 33 deletions

File tree

crates/node/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ reth-node-types.workspace = true
5656
reth-network.workspace = true
5757
reth-network-api.workspace = true
5858
reth-network-p2p.workspace = true
59+
reth-provider.workspace = true
5960
reth-revm.workspace = true
6061
reth-rpc-api.workspace = true
6162
reth-rpc-eth-api.workspace = true
@@ -86,7 +87,6 @@ reth-db = { workspace = true, optional = true, features = ["test-utils"] }
8687
reth-e2e-test-utils = { workspace = true, optional = true }
8788
reth-engine-local = { workspace = true, optional = true }
8889
reth-fs-util = { workspace = true, optional = true }
89-
reth-provider = { workspace = true, optional = true }
9090
reth-rpc-layer = { workspace = true, optional = true }
9191
reth-rpc-server-types = { workspace = true, optional = true }
9292
reth-storage-api = { workspace = true, optional = true }

crates/node/src/add_ons/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ where
171171
let remote_source = RemoteBlockSourceAddOn::new(
172172
remote_block_source_config,
173173
rollup_manager_handle.clone(),
174+
rpc_handle.provider().clone(),
174175
)
175176
.await?;
176177
ctx.node

crates/node/src/add_ons/remote_block_source.rs

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
44
use crate::args::RemoteBlockSourceArgs;
55
use alloy_primitives::Signature;
6-
use alloy_provider::{Provider, ProviderBuilder};
6+
use alloy_provider::{Provider, ProviderBuilder, RootProvider};
77
use alloy_rpc_client::RpcClient;
88
use alloy_transport::layers::RetryBackoffLayer;
99
use futures::StreamExt;
1010
use reth_network_api::{FullNetwork, PeerId};
11+
use reth_provider::BlockReader;
1112
use reth_scroll_node::ScrollNetworkPrimitives;
1213
use reth_tasks::shutdown::Shutdown;
1314
use reth_tokio_util::EventStream;
@@ -26,7 +27,12 @@ where
2627
/// Configuration for the remote block source.
2728
config: RemoteBlockSourceArgs,
2829
/// Handle to the chain orchestrator for sending commands.
29-
handle: ChainOrchestratorHandle<N>,
30+
orchestrator_handle: ChainOrchestratorHandle<N>,
31+
/// An event stream for listening to chain orchestrator events, used to wait for block build
32+
/// completion.
33+
events: EventStream<ChainOrchestratorEvent>,
34+
/// A provider for the remote node, used to fetch blocks and block information.
35+
remote: RootProvider<Scroll>,
3036
/// Tracks the last block number we imported from remote.
3137
/// This is different from local head because we build blocks on top of imports.
3238
last_imported_block: u64,
@@ -40,40 +46,72 @@ where
4046
pub async fn new(
4147
config: RemoteBlockSourceArgs,
4248
handle: ChainOrchestratorHandle<N>,
49+
provider: impl BlockReader,
4350
) -> eyre::Result<Self> {
44-
let last_imported_block = handle.status().await?.l2.fcs.head_block_info().number;
45-
Ok(Self { config, handle, last_imported_block })
46-
}
47-
48-
/// Runs the remote block source until shutdown.
49-
pub async fn run_until_shutdown(mut self, mut shutdown: Shutdown) -> eyre::Result<()> {
50-
let Some(url) = self.config.url.clone() else {
51+
// Build remote provider with retry layer.
52+
let Some(url) = config.url.clone() else {
5153
tracing::error!(target: "scroll::remote_source", "URL required when remote-source is enabled");
5254
return Err(eyre::eyre!("URL required when remote-source is enabled"));
5355
};
54-
55-
// Build remote provider with retry layer
5656
let retry_layer = RetryBackoffLayer::new(10, 100, 330);
5757
let client = RpcClient::builder().layer(retry_layer).http(url);
5858
let remote = ProviderBuilder::<_, _, Scroll>::default().connect_client(client);
5959

6060
// Get event listener for waiting on block completion
61-
let mut event_stream = match self.handle.get_event_listener().await {
61+
let events = match handle.get_event_listener().await {
6262
Ok(stream) => stream,
6363
Err(e) => {
6464
tracing::error!(target: "scroll::remote_source", ?e, "Failed to get event listener");
6565
return Err(eyre::eyre!(e));
6666
}
6767
};
6868

69+
// Determine the last imported block by finding the highest common block
70+
// between the local chain and the remote node.
71+
let local_head = provider.best_block_number()?;
72+
let remote_head = remote.get_block_number().await?;
73+
74+
let last_imported_block;
75+
let mut search = local_head.min(remote_head);
76+
loop {
77+
if search == 0 {
78+
// Genesis is always a common block (same chain spec assumed).
79+
last_imported_block = 0;
80+
break;
81+
}
82+
let local_hash = provider.block_hash(search)?;
83+
let remote_block = remote.get_block_by_number(search.into()).await?;
84+
match (local_hash, remote_block) {
85+
(Some(lh), Some(rb)) if lh == rb.header.hash => {
86+
last_imported_block = search;
87+
break;
88+
}
89+
_ => {
90+
search = search.saturating_sub(1);
91+
}
92+
}
93+
}
94+
tracing::info!(
95+
target: "scroll::remote_source",
96+
last_imported_block,
97+
local_head,
98+
remote_head,
99+
"Determined highest common block with remote"
100+
);
101+
102+
Ok(Self { config, orchestrator_handle: handle, events, remote, last_imported_block })
103+
}
104+
105+
/// Runs the remote block source until shutdown.
106+
pub async fn run_until_shutdown(mut self, mut shutdown: Shutdown) -> eyre::Result<()> {
69107
let mut poll_interval = interval(Duration::from_millis(self.config.poll_interval_ms));
70108

71109
loop {
72110
tokio::select! {
73111
biased;
74112
_guard = &mut shutdown => break,
75113
_ = poll_interval.tick() => {
76-
if let Err(e) = self.follow_and_build(&remote, &mut event_stream).await {
114+
if let Err(e) = self.follow_and_build().await {
77115
tracing::error!(target: "scroll::remote_source", ?e, "Sync error");
78116
}
79117
}
@@ -84,14 +122,11 @@ where
84122
}
85123

86124
/// Follows the remote node and builds blocks on top of imported blocks.
87-
async fn follow_and_build<P: Provider<Scroll>>(
88-
&mut self,
89-
remote: &P,
90-
event_stream: &mut EventStream<ChainOrchestratorEvent>,
91-
) -> eyre::Result<()> {
125+
async fn follow_and_build(&mut self) -> eyre::Result<()> {
92126
loop {
93127
// Get remote head
94-
let remote_block = remote
128+
let remote_block = self
129+
.remote
95130
.get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
96131
.full()
97132
.await?
@@ -117,7 +152,8 @@ where
117152

118153
// Fetch and import the next block from remote
119154
let next_block_num = self.last_imported_block + 1;
120-
let block = remote
155+
let block = self
156+
.remote
121157
.get_block_by_number(next_block_num.into())
122158
.full()
123159
.await?
@@ -134,7 +170,7 @@ where
134170

135171
// Import the block (this will cause a reorg if we had a locally built block at this
136172
// height)
137-
let chain_import = match self.handle.import_block(block_with_peer).await {
173+
let chain_import = match self.orchestrator_handle.import_block(block_with_peer).await {
138174
Ok(Ok(chain_import)) => {
139175
self.last_imported_block = next_block_num;
140176
chain_import
@@ -155,12 +191,12 @@ where
155191
}
156192

157193
// Trigger block building on top of the imported block
158-
self.handle.build_block();
194+
self.orchestrator_handle.build_block();
159195

160196
// Wait for BlockSequenced event
161197
tracing::debug!(target: "scroll::remote_source", "Waiting for block to be built...");
162198
loop {
163-
match event_stream.next().await {
199+
match self.events.next().await {
164200
Some(ChainOrchestratorEvent::BlockSequenced(block)) => {
165201
tracing::info!(target: "scroll::remote_source",
166202
block_number = block.header.number,

crates/node/src/test_utils/fixture.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ pub struct TestFixture {
6161
pub anvil: Option<anvil::NodeHandle>,
6262
/// The configuration for the nodes.
6363
pub config: ScrollRollupNodeConfig,
64+
/// Whether this fixture has a remote source node (always the last node).
65+
pub has_remote_source_node: bool,
6466
}
6567

6668
impl Debug for TestFixture {
@@ -70,6 +72,7 @@ impl Debug for TestFixture {
7072
.field("wallet", &"<Mutex<Wallet>>")
7173
.field("chain_spec", &self.chain_spec)
7274
.field("anvil", &self.anvil.is_some())
75+
.field("has_remote_source_node", &self.has_remote_source_node)
7376
.field("_tasks", &"<TaskManager>")
7477
.finish()
7578
}
@@ -703,7 +706,7 @@ impl TestFixtureBuilder {
703706
None
704707
};
705708

706-
let (mut nodes, dbs, wallet) = setup_engine(
709+
let (mut nodes, mut dbs, wallet) = setup_engine(
707710
self.config.clone(),
708711
self.num_nodes,
709712
chain_spec.clone(),
@@ -728,7 +731,7 @@ impl TestFixtureBuilder {
728731
// Use a fast poll interval for tests
729732
remote_config.remote_block_source_args.poll_interval_ms = 100;
730733

731-
let (mut remote_nodes, _, _) = setup_engine(
734+
let (mut remote_nodes, remote_dbs, _) = setup_engine(
732735
remote_config,
733736
1,
734737
chain_spec.clone(),
@@ -739,13 +742,15 @@ impl TestFixtureBuilder {
739742
.await?;
740743

741744
nodes.push(remote_nodes.pop().unwrap());
745+
dbs.extend(remote_dbs);
742746
}
743747

744-
let mut node_handles = Vec::with_capacity(nodes.len());
748+
let nodes_len = nodes.len();
749+
let mut node_handles = Vec::with_capacity(nodes_len);
745750
for (index, node) in nodes.into_iter().enumerate() {
746751
let typ = if self.config.sequencer_args.sequencer_enabled && index == 0 {
747752
NodeType::Sequencer
748-
} else if self.config.remote_block_source_args.enabled && index == node_handles.len() {
753+
} else if self.has_remote_source_node && index == nodes_len - 1 {
749754
NodeType::RemoteSource
750755
} else {
751756
NodeType::Follower
@@ -760,6 +765,7 @@ impl TestFixtureBuilder {
760765
chain_spec,
761766
anvil,
762767
config: self.config,
768+
has_remote_source_node: self.has_remote_source_node,
763769
})
764770
}
765771

crates/node/src/test_utils/reboot.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,43 @@ impl TestFixture {
9393

9494
tracing::info!("Starting node at index {} (reusing database)", node_index);
9595

96+
// Detect whether the node being restarted is the remote source (always the last index).
97+
let is_remote = self.has_remote_source_node && node_index == self.nodes.len() - 1;
98+
99+
let (config, reboot_node_idx) = if is_remote {
100+
// Reconstruct the remote config from the base config.
101+
// Node 0 (sequencer) must still be running so we can read its RPC port.
102+
let sequencer_port = self.nodes[0]
103+
.as_ref()
104+
.expect("sequencer must be running to restart remote source")
105+
.node
106+
.rpc_url()
107+
.port()
108+
.expect("sequencer RPC port must be set");
109+
let sequencer_url: reqwest::Url =
110+
format!("http://localhost:{}", sequencer_port).parse()?;
111+
112+
let mut remote_config = self.config.clone();
113+
remote_config.sequencer_args.sequencer_enabled = true;
114+
remote_config.sequencer_args.auto_start = false;
115+
remote_config.remote_block_source_args.enabled = true;
116+
remote_config.remote_block_source_args.url = Some(sequencer_url);
117+
remote_config.remote_block_source_args.poll_interval_ms = 100;
118+
119+
// Use reboot_node_idx=0 so setup_engine does NOT disable sequencer_enabled.
120+
(remote_config, 0usize)
121+
} else {
122+
(self.config.clone(), node_index)
123+
};
124+
96125
// Create node instance with existing database
97126
let (mut new_nodes, _, _) = setup_engine(
98-
self.config.clone(),
127+
config,
99128
1,
100129
self.chain_spec.clone(),
101130
true,
102131
false,
103-
Some((node_index, self.dbs[node_index].clone())),
132+
Some((reboot_node_idx, self.dbs[node_index].clone())),
104133
)
105134
.await?;
106135

@@ -109,10 +138,10 @@ impl TestFixture {
109138
}
110139

111140
let new_node = new_nodes.remove(0);
112-
let typ = if self.config.sequencer_args.sequencer_enabled && node_index == 0 {
113-
crate::test_utils::fixture::NodeType::Sequencer
114-
} else if self.config.remote_block_source_args.enabled && node_index == self.nodes.len() {
141+
let typ = if is_remote {
115142
crate::test_utils::fixture::NodeType::RemoteSource
143+
} else if self.config.sequencer_args.sequencer_enabled && node_index == 0 {
144+
crate::test_utils::fixture::NodeType::Sequencer
116145
} else {
117146
crate::test_utils::fixture::NodeType::Follower
118147
};

crates/node/tests/remote_block_source.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,56 @@ async fn test_remote_block_source() -> eyre::Result<()> {
2323

2424
Ok(())
2525
}
26+
27+
/// Test that the remote block source correctly determines its resume point on restart.
28+
///
29+
/// The remote source's local chain has blocks 1-3 (imported from sequencer) plus
30+
/// block 4 (built locally). The sequencer goes on to produce blocks 4-6. On restart,
31+
/// the highest-common-block walk must identify block 3 (locally-built block 4 diverges
32+
/// from sequencer's block 4) and import only blocks 4-6.
33+
///
34+
/// If the detection were broken (e.g. always returning 0), the remote source would try
35+
/// to re-import blocks 1-6, producing 6 `BlockSequenced` events before reaching blocks
36+
/// 5, 6, 7. This test asserts exactly three events in the correct order, confirming
37+
/// the resume point is block 3.
38+
#[allow(clippy::large_stack_frames)]
39+
#[tokio::test]
40+
async fn test_remote_block_source_resumes_from_correct_head() -> eyre::Result<()> {
41+
reth_tracing::init_test_tracing();
42+
43+
let mut fixture = TestFixture::builder().sequencer().remote_source_node().build().await?;
44+
45+
fixture.l1().sync().await?;
46+
47+
// Sequencer produces blocks 1-3; remote source imports each and builds on top.
48+
// After this phase the remote source local chain is: 1, 2, 3 (sequencer) + 4 (local).
49+
for i in 1..=3u64 {
50+
fixture.build_block().expect_block_number(i).build_and_await_block().await?;
51+
fixture.expect_event_on(1).block_sequenced(i + 1).await?;
52+
}
53+
54+
// Shut down the remote source node (index 1).
55+
fixture.shutdown_node(1).await?;
56+
57+
// Sequencer produces blocks 4-6 while the remote source is offline.
58+
for i in 4..=6u64 {
59+
fixture.build_block().expect_block_number(i).build_and_await_block().await?;
60+
}
61+
62+
// Restart the remote source.
63+
// Expected detection: local_head=4, remote_head=6, min=4.
64+
// Block 4: local hash (locally built) ≠ remote hash (sequencer's) → walk back.
65+
// Block 3: local hash == remote hash → last_imported_block = 3.
66+
// The add-on should therefore import blocks 4, 5, 6 and build 5, 6, 7 on top.
67+
fixture.start_node(1).await?;
68+
69+
// Synchronise L1 state on the restarted remote source node.
70+
fixture.l1().for_node(1).sync().await?;
71+
72+
// Verify the remote source catches up with the 3 missed sequencer blocks.
73+
fixture.expect_event_on(1).block_sequenced(5).await?;
74+
fixture.expect_event_on(1).block_sequenced(6).await?;
75+
fixture.expect_event_on(1).block_sequenced(7).await?;
76+
77+
Ok(())
78+
}

0 commit comments

Comments
 (0)