diff --git a/Cargo.lock b/Cargo.lock index 54d1a06c0ac17..68304f933604d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1687,7 +1687,7 @@ checksum = "0e25ea47919b1560c4e3b7fe0aaab9becf5b84a10325ddf7db0f0ba5e1026499" [[package]] name = "differential-dataflow" version = "0.12.0" -source = "git+https://github.com/TimelyDataflow/differential-dataflow.git#c2e8fefce9ddad0aef5afcac0238b4dc6ae0ddcb" +source = "git+https://github.com/TimelyDataflow/differential-dataflow.git#8a04699df22600a332a5fafadbe7eebb84ea16d5" dependencies = [ "abomonation", "abomonation_derive", @@ -1764,7 +1764,7 @@ checksum = "923dea538cea0aa3025e8685b20d6ee21ef99c4f77e954a30febbaac5ec73a97" [[package]] name = "dogsdogsdogs" version = "0.1.0" -source = "git+https://github.com/TimelyDataflow/differential-dataflow.git#c2e8fefce9ddad0aef5afcac0238b4dc6ae0ddcb" +source = "git+https://github.com/TimelyDataflow/differential-dataflow.git#8a04699df22600a332a5fafadbe7eebb84ea16d5" dependencies = [ "abomonation", "abomonation_derive", diff --git a/src/compute/src/render/top_k.rs b/src/compute/src/render/top_k.rs index 301524da937c8..325faf7894746 100644 --- a/src/compute/src/render/top_k.rs +++ b/src/compute/src/render/top_k.rs @@ -11,27 +11,31 @@ //! //! Consult [TopKPlan] documentation for details. +use std::cell::RefCell; use std::collections::HashMap; +use std::rc::Rc; use differential_dataflow::hashable::Hashable; use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::arrange::ArrangeBySelf; +use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent}; use differential_dataflow::operators::reduce::ReduceCore; use differential_dataflow::operators::Consolidate; use differential_dataflow::trace::implementations::ord::OrdValSpine; use differential_dataflow::AsCollection; use differential_dataflow::Collection; -use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline}; use timely::dataflow::operators::Operator; +use timely::dataflow::stream::Stream; use timely::dataflow::Scope; use mz_compute_client::plan::top_k::{ BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan, }; -use mz_repr::{Diff, Row}; +use mz_repr::{DatumVec, Diff, Row}; use crate::render::context::CollectionBundle; use crate::render::context::Context; +use crate::typedefs::RowSpine; // The implementation requires integer timestamps to be able to delay feedback for monotonic inputs. impl Context @@ -303,7 +307,6 @@ where // We can place our rows directly into the diff field, and only keep the relevant one // corresponding to evaluating our aggregate, instead of having to do a hierarchical // reduction. - use timely::dataflow::operators::Map; let collection = collection.map({ let mut datum_vec = mz_repr::DatumVec::new(); @@ -320,40 +323,94 @@ where } }); - // We arrange the inputs ourself to force it into a leaner structure because we know we + // Firstly, we render a pipelined step for pre-aggregation per worker, mapping rows to diffs + let arranged = + render_top1_preaggregation(collection.inner, order_key.clone(), Pipeline); + + // Secondly, we map rows back from the diff field in preparation for another step + // Note that we do not use explode here since it's unclear whether we'd like to implement Multiply in the Top1Monoid + let mut buffer = Default::default(); + let preaggregated = arranged.as_collection(|row, ()| row.clone()).inner.unary( + Pipeline, + "Top1MonotonicMapToRow", + move |_cap, _op| { + move |input, output| { + input.for_each(|time, data| { + data.swap(&mut buffer); + output.session(&time).give_iterator( + buffer.drain(..).map(|x| ((x.0, x.2.into_row()), x.1, 1)), + ); + }) + } + }, + ); + + // Thirdly, we render an exchange step to correctly shuffle among workers + let arranged = render_top1_preaggregation( + preaggregated, + order_key, + Exchange::new(move |((group_key, _), _, _): &((Row, _), _, _)| group_key.hashed()), + ); + + // Lastly, we compute the result by a final reduction + let result = arranged.reduce_abelian::<_, OrdValSpine<_, _, _, _>>("Top1Monotonic", { + move |_key, input, output| { + let accum = &input[0].1; + output.push((accum.row.clone(), 1)); + } + }); + // TODO(#7331): Here we discard the arranged output. + result.as_collection(|_k, v| v.clone()) + } + + fn render_top1_preaggregation( + input: Stream, + order_key: Vec, + pact: P, + ) -> Arranged>> + where + G: Scope, + G::Timestamp: Lattice, + P: ParallelizationContract, + { + // We allocate a single reference-counted object to represent the desired ordering + let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared { + order_key, + left: DatumVec::new(), + right: DatumVec::new(), + })); + + // We arrange the input ourselves to force it into a leaner structure because we know we // won't care about values. // // TODO: Could we use explode here? We'd lose the diff>0 assert and we'd have to impl Mul // for the monoid, unclear if it's worth it. - let partial: Collection = collection - // TODO(#16549): Use explicit arrangement - .consolidate() - .inner - .map(move |((group_key, row), time, diff)| { - assert!(diff > 0); - // NB: Top1 can throw out the diff since we've asserted that it's > 0. A more - // general TopK monoid would have to account for diff. - ( - group_key, - time, - monoids::Top1Monoid { - row, - order_key: order_key.clone(), - }, - ) + let mut buffer = Default::default(); + let prepared: Collection = input + .unary(pact, "Top1MonotonicPrepare", move |_cap, _op| { + move |input, output| { + while let Some((cap, data)) = input.next() { + data.swap(&mut buffer); + output.session(&cap).give_iterator(buffer.drain(..).map( + |((group_key, row), time, diff)| { + assert!(diff > 0); + // NB: Top1 can throw out the diff since we've asserted that it's > 0. A more + // general TopK monoid would have to account for diff. + ( + (group_key, ()), + time, + monoids::Top1Monoid { + row, + shared: Rc::clone(&shared), + }, + ) + }, + )) + } + } }) .as_collection(); - let result = partial - // TODO(#16549): Use explicit arrangement - .arrange_by_self() - .reduce_abelian::<_, OrdValSpine<_, _, _, _>>("Top1Monotonic", { - move |_key, input, output| { - let accum = &input[0].1; - output.push((accum.row.clone(), 1)); - } - }); - // TODO(#7331): Here we discard the arranged output. - result.as_collection(|_k, v| v.clone()) + prepared.arrange_core::<_, RowSpine<_, _, _, _>>(Pipeline, "Top1MonotonicArrange") } fn render_intra_ts_thinning( @@ -367,6 +424,11 @@ where { let mut aggregates = HashMap::new(); let mut vector = Vec::new(); + let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared { + order_key, + left: DatumVec::new(), + right: DatumVec::new(), + })); collection .inner .unary_notify( @@ -382,7 +444,7 @@ where for ((grp_row, row), record_time, diff) in vector.drain(..) { let monoid = monoids::Top1Monoid { row, - order_key: order_key.clone(), + shared: Rc::clone(&shared), }; let topk = agg_time.entry((grp_row, record_time)).or_insert_with( move || { @@ -401,7 +463,11 @@ where let mut session = output.session(&time); for ((grp_row, record_time), topk) in aggs { session.give_iterator(topk.into_iter().map(|(monoid, diff)| { - ((grp_row.clone(), monoid.row), record_time.clone(), diff) + ( + (grp_row.clone(), monoid.into_row()), + record_time.clone(), + diff, + ) })) } } @@ -508,32 +574,67 @@ pub mod topk_agg { /// Monoids for in-place compaction of monotonic streams. pub mod monoids { + use std::cell::RefCell; use std::cmp::Ordering; + use std::hash::{Hash, Hasher}; + use std::rc::Rc; use differential_dataflow::difference::Semigroup; - use serde::{Deserialize, Serialize}; use mz_expr::ColumnOrder; - use mz_repr::Row; + use mz_repr::{DatumVec, Row}; + + /// A shared portion of a thread-local top-1 monoid implementation. + #[derive(Debug)] + pub struct Top1MonoidShared { + pub order_key: Vec, + pub left: DatumVec, + pub right: DatumVec, + } - /// A monoid containing a row and an ordering. - #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)] + /// A monoid containing a row and a shared pointer to a shared structure. + /// Only suitable for thread-local aggregations. + #[derive(Debug, Clone)] pub struct Top1Monoid { pub row: Row, - pub order_key: Vec, + pub shared: Rc>, + } + + impl Top1Monoid { + pub fn into_row(self) -> Row { + self.row + } + } + + impl PartialEq for Top1Monoid { + fn eq(&self, other: &Self) -> bool { + self.row.eq(&other.row) + } + } + + impl Eq for Top1Monoid {} + + impl Hash for Top1Monoid { + fn hash(&self, state: &mut H) { + self.row.hash(state); + } } impl Ord for Top1Monoid { fn cmp(&self, other: &Self) -> Ordering { - debug_assert_eq!(self.order_key, other.order_key); - - // It might be nice to cache this row decoding like the non-monotonic codepath, but we'd - // have to store the decoded Datums in the same struct as the Row, which gets tricky. - let left: Vec<_> = self.row.unpack(); - let right: Vec<_> = other.row.unpack(); - mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right)) + debug_assert!(Rc::ptr_eq(&self.shared, &other.shared)); + let Top1MonoidShared { + left, + right, + order_key, + } = &mut *self.shared.borrow_mut(); + + let left = left.borrow_with(&self.row); + let right = right.borrow_with(&other.row); + mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right)) } } + impl PartialOrd for Top1Monoid { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) @@ -545,7 +646,7 @@ pub mod monoids { let cmp = (*self).cmp(rhs); // NB: Reminder that TopK returns the _minimum_ K items. if cmp == Ordering::Greater { - self.clone_from(rhs); + self.row.clone_from(&rhs.row); } } diff --git a/src/repr/src/row.rs b/src/repr/src/row.rs index 42bbc0844aefb..1d57a15c7c4f7 100644 --- a/src/repr/src/row.rs +++ b/src/repr/src/row.rs @@ -136,6 +136,10 @@ impl Clone for Row { data: SmallVec::from_slice(self.data.as_slice()), } } + + fn clone_from(&mut self, source: &Self) { + self.data.clone_from(&source.data); + } } impl Row {