Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion differential-dataflow/examples/degrees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() {
// .count_total();

// show us something about the collection, notice when done.
let probe = if inspect {
let (probe, _) = if inspect {
degrs.inspect(|x| println!("observed: {:?}", x))
.probe()
}
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn main() {
.count_total();

// show us something about the collection, notice when done.
let probe =
let (probe, _) =
out_degr_distr
.filter(move |_| inspect)
.inspect(|x| println!("observed: {:?}", x))
Expand Down
3 changes: 2 additions & 1 deletion differential-dataflow/examples/itembased_cf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ fn main() {
let thresholded_similarities = jaccard_similarities
.filter(|(_item_pair, jaccard)| *jaccard > 0.05);

thresholded_similarities.probe()
let (probe, _) = thresholded_similarities.probe();
probe
});

let num_interactions: usize = std::env::args().nth(1)
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ impl<G: Scope, C: Container> Collection<G, C> {
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
pub fn probe(self) -> (probe::Handle<G::Timestamp>, Self) {
let (handle, stream) = self.inner.probe();
(handle, stream.as_collection())
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
/// // create input handle and collection.
/// let (handle, data) = scope.new_collection();
/// let probe = data.map(|x| x * 2)
/// let (probe, _) = data.map(|x| x * 2)
/// .inspect(|x| println!("{:?}", x))
/// .probe();
/// (handle, probe)
Expand All @@ -56,7 +56,7 @@
/// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
/// // create input handle and collection.
/// let (handle, data) = scope.new_collection_from(0 .. 10);
/// let probe = data.map(|x| x * 2)
/// let (probe, _) = data.map(|x| x * 2)
/// .inspect(|x| println!("{:?}", x))
/// .probe();
/// (handle, probe)
Expand All @@ -82,7 +82,7 @@
/// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
/// // create input handle and collection.
/// let (handle, data) = scope.new_collection_from(0 .. 10);
/// let probe = data.map(|x| x * 2)
/// let (probe, _) = data.map(|x| x * 2)
/// .inspect(|x| println!("{:?}", x))
/// .probe();
/// (handle, probe)
Expand Down Expand Up @@ -142,7 +142,7 @@
/// let (mut handle, probe) = worker.dataflow(|scope| {
/// // create input handle and collection.
/// let (handle, data) = scope.new_collection_from(0 .. 10);
/// let probe = data.map(|x| x * 2)
/// let (probe, _) = data.map(|x| x * 2)
/// .inspect(|x| println!("{:?}", x))
/// .probe();
/// (handle, probe)
Expand Down Expand Up @@ -198,7 +198,7 @@
impl<T: Timestamp+Clone, D: Data, R: Semigroup+'static> InputSession<T, D, R> {

/// Introduces a handle as collection.
pub fn to_collection<G: TimelyInput>(&mut self, scope: &mut G) -> VecCollection<G, D, R>

Check warning on line 201 in differential-dataflow/src/input.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

bound is defined in more than one place
where
G: ScopeParent<Timestamp=T>,
{
Expand Down
11 changes: 7 additions & 4 deletions differential-dataflow/tests/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ fn test_import_completed_dataflow() {
let (input, edges) = scope.new_input();
let arranged = edges.as_collection()
.arrange_by_key();
(input, arranged.trace.clone(), arranged.stream.probe())
let (probe, _) = arranged.stream.probe();
(input, arranged.trace.clone(), probe)
});

for (t, changes) in input_epochs.into_iter().enumerate() {
Expand All @@ -138,7 +139,7 @@ fn test_import_completed_dataflow() {
.as_collection(|k,v| (k.clone(), v.clone()))
.inner
.exchange(|_| 0);
let probe = stream.clone().probe(); // <-- ack terrible!
let (probe, stream) = stream.probe();
let captured = stream.capture();
(probe, captured,)
});
Expand Down Expand Up @@ -175,7 +176,8 @@ fn test_import_stalled_dataflow() {
.to_collection(scope)
.arrange_by_self();

(arranged.trace, arranged.stream.probe())
let (probe, _) = arranged.stream.probe();
(arranged.trace, probe)
});

input.insert("Hello".to_owned());
Expand All @@ -190,7 +192,8 @@ fn test_import_stalled_dataflow() {
worker.step_while(|| probe1.less_than(input.time()));

let probe2 = worker.dataflow(|scope| {
trace.import(scope).stream.probe()
let (probe, _) = trace.import(scope).stream.probe();
probe
});

worker.step();
Expand Down
Loading