Function timely::dataflow::operators::generic::operator::source [] [src]

pub fn source<G: Scope, D, B, L>(
    scope: &G,
    name: &str,
    constructor: B
) -> Stream<G, D> where
    D: Data,
    B: FnOnce(Capability<G::Timestamp>) -> L,
    L: FnMut(&mut OutputHandle<G::Timestamp, D, Tee<G::Timestamp, D>>) + 'static, 

Creates a new data stream source for a scope.

The source is defined by a name, and a constructor which takes a default capability to a method that can be repeatedly called on a output handle. The method is then repeatedly invoked, and is expected to eventually send data and downgrade and release capabilities.

Examples

use timely::dataflow::operators::Inspect;
use timely::dataflow::operators::generic::operator::source;
use timely::dataflow::Scope;

timely::example(|scope| {

    source(scope, "Source", |capability| {
        let mut cap = Some(capability);
        move |output| {

            let mut done = false;
            if let Some(cap) = cap.as_mut() {
                // get some data and send it.
                let mut time = cap.time().clone();
                output.session(&cap)
                      .give(cap.time().inner);

                // downgrade capability.
                time.inner += 1;
                *cap = cap.delayed(&time);
                done = time.inner > 20;
            }

            if done { cap = None; }
        }
    })
    .inspect(|x| println!("number: {:?}", x));
});