Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions timely/src/dataflow/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> {
let path = slot.addr();
let identifier = slot.identifier();

let type_name = std::any::type_name::<T2>();
let summary_logging = self.worker().logger_for(&format!("timely/summary/{type_name}"));

let subgraph = RefCell::new(SubgraphBuilder::new_from(
path, identifier, self.worker().logging(), summary_logging, name,
));
let subgraph = RefCell::new(SubgraphBuilder::new_from(path, identifier, name));

let child = Scope { subgraph: &subgraph, worker: self.worker };

Expand Down
76 changes: 36 additions & 40 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
/// A global identifier for this subgraph.
identifier: usize,

// handles to the children of the scope. index i corresponds to entry i-1, unless things change.
children: Vec<PerOperatorState<TInner>>,
// Deferred children: (operator, index, identifier). Built into PerOperatorState at build time.
children: Vec<(Box<dyn Operate<TInner>>, usize, usize)>,
child_count: usize,

edge_stash: Vec<(Source, Target)>,
Expand All @@ -64,10 +64,6 @@
// expressed capabilities, used to filter changes against.
outputs: usize,

/// Logging handle
logging: Option<Logger>,
/// Typed logging handle for operator summaries.
summary_logging: Option<SummaryLogger<TInner::Summary>>,
}

impl<TInner> SubgraphBuilder<TInner>
Expand Down Expand Up @@ -99,28 +95,22 @@
pub fn new_from(
path: Rc<[usize]>,
identifier: usize,
logging: Option<Logger>,
summary_logging: Option<SummaryLogger<TInner::Summary>>,
name: &str,
)
-> SubgraphBuilder<TInner>
{
// Put an empty placeholder for "outer scope" representative.
let children = vec![PerOperatorState::empty(0, 0)];
let index = path[path.len() - 1];

SubgraphBuilder {
name: name.to_owned(),
path,
index,
identifier,
children,
children: Vec::new(),
child_count: 1,
edge_stash: Vec::new(),
input_messages: Vec::new(),
outputs: 0,
logging,
summary_logging,
}
}

Expand All @@ -131,20 +121,10 @@
}

/// Adds a new child to the subgraph.
///
/// The child will be initialized and logged when [`build`] is called.
pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
let child = PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging);
if let Some(l) = &mut self.logging {
let mut child_path = Vec::with_capacity(self.path.len() + 1);
child_path.extend_from_slice(&self.path[..]);
child_path.push(index);

l.log(crate::logging::OperatesEvent {
id: identifier,
addr: child_path,
name: child.name.to_owned(),
});
}
self.children.push(child);
self.children.push((child, index, identifier));
}

/// Now that initialization is complete, actually build a subgraph.
Expand All @@ -154,41 +134,57 @@
// we also need to determine what to return as a summary and initial capabilities, which
// will depend on child summaries and capabilities, as well as edges in the subgraph.

// perhaps first check that the children are sanely identified
self.children.sort_unstable_by(|x,y| x.index.cmp(&y.index));
assert!(self.children.iter().enumerate().all(|(i,x)| i == x.index));

let inputs = self.input_messages.len();
let outputs = self.outputs;

// Create empty child zero representative.
self.children[0] = PerOperatorState::empty(outputs, inputs);
let type_name = std::any::type_name::<TInner>();
let mut logging = worker.logging();
let mut summary_logging = worker.logger_for(&format!("timely/summary/{type_name}"));

// Sort stashed children by index, and preface with a child zero mirroring the subgraph shape.
self.children.sort_unstable_by_key(|&(_, index, _)| index);
let mut children: Vec<_> = [PerOperatorState::empty(outputs, inputs)]
.into_iter()
.chain(self.children.into_iter().map(|(operator, index, identifier)| {
let child = PerOperatorState::new(operator, index, identifier, logging.clone(), &mut summary_logging);
if let Some(l) = &mut logging {
let mut child_path = Vec::with_capacity(self.path.len() + 1);
child_path.extend_from_slice(&self.path[..]);
child_path.push(index);
l.log(crate::logging::OperatesEvent {
id: identifier,
addr: child_path,
name: child.name.to_owned(),
});
}
child
}))
.collect();
assert!(children.iter().enumerate().all(|(i,x)| i == x.index));

let mut builder = reachability::Builder::new();

// Child 0 has `inputs` outputs and `outputs` inputs, not yet connected.
let summary = (0..outputs).map(|_| PortConnectivity::default()).collect();
builder.add_node(0, outputs, inputs, summary);
for (index, child) in self.children.iter().enumerate().skip(1) {
for (index, child) in children.iter().enumerate().skip(1) {
builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
}

for (source, target) in self.edge_stash {
self.children[source.node].edges[source.port].push(target);
children[source.node].edges[source.port].push(target);
builder.add_edge(source, target);
}

// The `None` argument is optional logging infrastructure.
let type_name = std::any::type_name::<TInner>();
let reachability_logging =
worker.logger_for(&format!("timely/reachability/{type_name}"))
.map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
let progress_logging = worker.logger_for(&format!("timely/progress/{type_name}"));
let (tracker, scope_summary) = builder.build(reachability_logging);

let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging);
let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, logging, progress_logging);

let mut incomplete = vec![true; self.children.len()];
let mut incomplete = vec![true; children.len()];
incomplete[0] = false;
let incomplete_count = incomplete.len() - 1;

Expand All @@ -197,7 +193,7 @@
activations.borrow_mut().activate(&self.path[..]);

// The subgraph's per-input interest is conservatively the max across all children's inputs.
let max_interest = self.children.iter()
let max_interest = children.iter()
.flat_map(|c| c.notify.iter().copied())
.max()
.unwrap_or(FrontierInterest::Never);
Expand All @@ -213,7 +209,7 @@
activations,
temp_active: BinaryHeap::new(),
maybe_shutdown: Vec::new(),
children: self.children,
children,
input_messages: self.input_messages,
output_capabilities: vec![MutableAntichain::new(); self.outputs],

Expand Down Expand Up @@ -792,7 +788,7 @@
for (time, diff) in internal.iter() {
if *diff > 0 {
let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
let internal = child_state.sources[output].implications.less_equal(time);

Check warning on line 791 in timely/src/progress/subgraph.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`internal` shadows a previous, unrelated binding
if !consumed && !internal {
println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
panic!("Progress error; internal {:?}", self.name);
Expand Down
4 changes: 1 addition & 3 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,7 @@ impl Worker {
let addr = vec![dataflow_index].into();
let identifier = self.new_identifier();

let type_name = std::any::type_name::<T>();
let summary_logging = self.logger_for(&format!("timely/summary/{}", type_name));
let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name);
let subscope = SubgraphBuilder::new_from(addr, identifier, name);
let subscope = RefCell::new(subscope);

let result = {
Expand Down
Loading