diff --git a/timely/src/dataflow/scope.rs b/timely/src/dataflow/scope.rs index fb922a988..eceed1686 100644 --- a/timely/src/dataflow/scope.rs +++ b/timely/src/dataflow/scope.rs @@ -116,12 +116,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { let path = slot.addr(); let identifier = slot.identifier(); - let type_name = std::any::type_name::(); - let summary_logging = self.worker().logger_for(&format!("timely/summary/{type_name}")); - - let subgraph = RefCell::new(SubgraphBuilder::new_from( - path, identifier, self.worker().logging(), summary_logging, name, - )); + let subgraph = RefCell::new(SubgraphBuilder::new_from(path, identifier, name)); let child = Scope { subgraph: &subgraph, worker: self.worker }; diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 7fb289f97..393223f53 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -52,8 +52,8 @@ where /// A global identifier for this subgraph. identifier: usize, - // handles to the children of the scope. index i corresponds to entry i-1, unless things change. - children: Vec>, + // Deferred children: (operator, index, identifier). Built into PerOperatorState at build time. + children: Vec<(Box>, usize, usize)>, child_count: usize, edge_stash: Vec<(Source, Target)>, @@ -64,10 +64,6 @@ where // expressed capabilities, used to filter changes against. outputs: usize, - /// Logging handle - logging: Option, - /// Typed logging handle for operator summaries. - summary_logging: Option>, } impl SubgraphBuilder @@ -99,14 +95,10 @@ where pub fn new_from( path: Rc<[usize]>, identifier: usize, - logging: Option, - summary_logging: Option>, name: &str, ) -> SubgraphBuilder { - // Put an empty placeholder for "outer scope" representative. - let children = vec![PerOperatorState::empty(0, 0)]; let index = path[path.len() - 1]; SubgraphBuilder { @@ -114,13 +106,11 @@ where path, index, identifier, - children, + children: Vec::new(), child_count: 1, edge_stash: Vec::new(), input_messages: Vec::new(), outputs: 0, - logging, - summary_logging, } } @@ -131,20 +121,10 @@ where } /// Adds a new child to the subgraph. + /// + /// The child will be initialized and logged when [`build`] is called. pub fn add_child(&mut self, child: Box>, index: usize, identifier: usize) { - let child = PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging); - if let Some(l) = &mut self.logging { - let mut child_path = Vec::with_capacity(self.path.len() + 1); - child_path.extend_from_slice(&self.path[..]); - child_path.push(index); - - l.log(crate::logging::OperatesEvent { - id: identifier, - addr: child_path, - name: child.name.to_owned(), - }); - } - self.children.push(child); + self.children.push((child, index, identifier)); } /// Now that initialization is complete, actually build a subgraph. @@ -154,41 +134,57 @@ where // we also need to determine what to return as a summary and initial capabilities, which // will depend on child summaries and capabilities, as well as edges in the subgraph. - // perhaps first check that the children are sanely identified - self.children.sort_unstable_by(|x,y| x.index.cmp(&y.index)); - assert!(self.children.iter().enumerate().all(|(i,x)| i == x.index)); - let inputs = self.input_messages.len(); let outputs = self.outputs; - // Create empty child zero representative. - self.children[0] = PerOperatorState::empty(outputs, inputs); + let type_name = std::any::type_name::(); + let mut logging = worker.logging(); + let mut summary_logging = worker.logger_for(&format!("timely/summary/{type_name}")); + + // Sort stashed children by index, and preface with a child zero mirroring the subgraph shape. + self.children.sort_unstable_by_key(|&(_, index, _)| index); + let mut children: Vec<_> = [PerOperatorState::empty(outputs, inputs)] + .into_iter() + .chain(self.children.into_iter().map(|(operator, index, identifier)| { + let child = PerOperatorState::new(operator, index, identifier, logging.clone(), &mut summary_logging); + if let Some(l) = &mut logging { + let mut child_path = Vec::with_capacity(self.path.len() + 1); + child_path.extend_from_slice(&self.path[..]); + child_path.push(index); + l.log(crate::logging::OperatesEvent { + id: identifier, + addr: child_path, + name: child.name.to_owned(), + }); + } + child + })) + .collect(); + assert!(children.iter().enumerate().all(|(i,x)| i == x.index)); let mut builder = reachability::Builder::new(); // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected. let summary = (0..outputs).map(|_| PortConnectivity::default()).collect(); builder.add_node(0, outputs, inputs, summary); - for (index, child) in self.children.iter().enumerate().skip(1) { + for (index, child) in children.iter().enumerate().skip(1) { builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone()); } for (source, target) in self.edge_stash { - self.children[source.node].edges[source.port].push(target); + children[source.node].edges[source.port].push(target); builder.add_edge(source, target); } - // The `None` argument is optional logging infrastructure. - let type_name = std::any::type_name::(); let reachability_logging = worker.logger_for(&format!("timely/reachability/{type_name}")) .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger)); let progress_logging = worker.logger_for(&format!("timely/progress/{type_name}")); let (tracker, scope_summary) = builder.build(reachability_logging); - let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging); + let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, logging, progress_logging); - let mut incomplete = vec![true; self.children.len()]; + let mut incomplete = vec![true; children.len()]; incomplete[0] = false; let incomplete_count = incomplete.len() - 1; @@ -197,7 +193,7 @@ where activations.borrow_mut().activate(&self.path[..]); // The subgraph's per-input interest is conservatively the max across all children's inputs. - let max_interest = self.children.iter() + let max_interest = children.iter() .flat_map(|c| c.notify.iter().copied()) .max() .unwrap_or(FrontierInterest::Never); @@ -213,7 +209,7 @@ where activations, temp_active: BinaryHeap::new(), maybe_shutdown: Vec::new(), - children: self.children, + children, input_messages: self.input_messages, output_capabilities: vec![MutableAntichain::new(); self.outputs], diff --git a/timely/src/worker.rs b/timely/src/worker.rs index c2170d281..2ec4fbeb6 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -621,9 +621,7 @@ impl Worker { let addr = vec![dataflow_index].into(); let identifier = self.new_identifier(); - let type_name = std::any::type_name::(); - let summary_logging = self.logger_for(&format!("timely/summary/{}", type_name)); - let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name); + let subscope = SubgraphBuilder::new_from(addr, identifier, name); let subscope = RefCell::new(subscope); let result = {