diff --git a/differential-dataflow/examples/degrees.rs b/differential-dataflow/examples/degrees.rs index 75af1e798..341e3f7ea 100644 --- a/differential-dataflow/examples/degrees.rs +++ b/differential-dataflow/examples/degrees.rs @@ -34,7 +34,7 @@ fn main() { // .count_total(); // show us something about the collection, notice when done. - let probe = if inspect { + let (probe, _) = if inspect { degrs.inspect(|x| println!("observed: {:?}", x)) .probe() } diff --git a/differential-dataflow/examples/hello.rs b/differential-dataflow/examples/hello.rs index b02f9c506..0606ddc05 100644 --- a/differential-dataflow/examples/hello.rs +++ b/differential-dataflow/examples/hello.rs @@ -36,7 +36,7 @@ fn main() { .count_total(); // show us something about the collection, notice when done. - let probe = + let (probe, _) = out_degr_distr .filter(move |_| inspect) .inspect(|x| println!("observed: {:?}", x)) diff --git a/differential-dataflow/examples/itembased_cf.rs b/differential-dataflow/examples/itembased_cf.rs index 5b8018e1f..cdc2ec331 100644 --- a/differential-dataflow/examples/itembased_cf.rs +++ b/differential-dataflow/examples/itembased_cf.rs @@ -72,7 +72,8 @@ fn main() { let thresholded_similarities = jaccard_similarities .filter(|(_item_pair, jaccard)| *jaccard > 0.05); - thresholded_similarities.probe() + let (probe, _) = thresholded_similarities.probe(); + probe }); let num_interactions: usize = std::env::args().nth(1) diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 6b38f834a..c13227f2c 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -144,9 +144,9 @@ impl Collection { /// /// This probe is used to determine when the state of the Collection has stabilized and can /// be read out. - pub fn probe(self) -> probe::Handle { - self.inner - .probe() + pub fn probe(self) -> (probe::Handle, Self) { + let (handle, stream) = self.inner.probe(); + (handle, stream.as_collection()) } /// Attaches a timely dataflow probe to the output of a Collection. /// diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index df21349cb..3fd612107 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -30,7 +30,7 @@ pub trait Input : TimelyInput { /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| { /// // create input handle and collection. /// let (handle, data) = scope.new_collection(); - /// let probe = data.map(|x| x * 2) + /// let (probe, _) = data.map(|x| x * 2) /// .inspect(|x| println!("{:?}", x)) /// .probe(); /// (handle, probe) @@ -56,7 +56,7 @@ pub trait Input : TimelyInput { /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| { /// // create input handle and collection. /// let (handle, data) = scope.new_collection_from(0 .. 10); - /// let probe = data.map(|x| x * 2) + /// let (probe, _) = data.map(|x| x * 2) /// .inspect(|x| println!("{:?}", x)) /// .probe(); /// (handle, probe) @@ -82,7 +82,7 @@ pub trait Input : TimelyInput { /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| { /// // create input handle and collection. /// let (handle, data) = scope.new_collection_from(0 .. 10); - /// let probe = data.map(|x| x * 2) + /// let (probe, _) = data.map(|x| x * 2) /// .inspect(|x| println!("{:?}", x)) /// .probe(); /// (handle, probe) @@ -142,7 +142,7 @@ impl Input for G where ::Timestamp: Lattice { /// let (mut handle, probe) = worker.dataflow(|scope| { /// // create input handle and collection. /// let (handle, data) = scope.new_collection_from(0 .. 10); -/// let probe = data.map(|x| x * 2) +/// let (probe, _) = data.map(|x| x * 2) /// .inspect(|x| println!("{:?}", x)) /// .probe(); /// (handle, probe) diff --git a/differential-dataflow/tests/import.rs b/differential-dataflow/tests/import.rs index d8d733e84..747e45b72 100644 --- a/differential-dataflow/tests/import.rs +++ b/differential-dataflow/tests/import.rs @@ -113,7 +113,8 @@ fn test_import_completed_dataflow() { let (input, edges) = scope.new_input(); let arranged = edges.as_collection() .arrange_by_key(); - (input, arranged.trace.clone(), arranged.stream.probe()) + let (probe, _) = arranged.stream.probe(); + (input, arranged.trace.clone(), probe) }); for (t, changes) in input_epochs.into_iter().enumerate() { @@ -138,7 +139,7 @@ fn test_import_completed_dataflow() { .as_collection(|k,v| (k.clone(), v.clone())) .inner .exchange(|_| 0); - let probe = stream.clone().probe(); // <-- ack terrible! + let (probe, stream) = stream.probe(); let captured = stream.capture(); (probe, captured,) }); @@ -175,7 +176,8 @@ fn test_import_stalled_dataflow() { .to_collection(scope) .arrange_by_self(); - (arranged.trace, arranged.stream.probe()) + let (probe, _) = arranged.stream.probe(); + (arranged.trace, probe) }); input.insert("Hello".to_owned()); @@ -190,7 +192,8 @@ fn test_import_stalled_dataflow() { worker.step_while(|| probe1.less_than(input.time())); let probe2 = worker.dataflow(|scope| { - trace.import(scope).stream.probe() + let (probe, _) = trace.import(scope).stream.probe(); + probe }); worker.step();