1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
//! Methods which describe an operators topology, and the progress it makes.

use std::default::Default;

use progress::{Timestamp, ChangeBatch, Antichain};


/// Methods for describing an operators topology, and the progress it makes.
pub trait Operate<T: Timestamp> {

    /// Indicates if the operator is strictly local to this worker.
    ///
    /// A parent scope must understand whether the progress information returned by the worker
    /// reflects only this worker's progress, so that it knows whether to send and receive the
    /// corresponding progress messages to its peers. If the operator is strictly local, it must
    /// exchange this information, whereas if the operator is itself implemented by the same set
    /// of workers, the parent scope understands that progress information already reflects the
    /// aggregate information among the workers.
    ///
    /// This is a coarse approximation to refined worker sets. In a future better world, operators
    /// would explain how their implementations are partitioned, so that a parent scope knows what
    /// progress information to exchange with which peers. Right now the two choices are either
    /// "all" or "none", but it could be more detailed. In the more detailed case, this method
    /// should / could return a pair (index, peers), indicating the group id of the worker out of
    /// how many groups. This becomes complicated, as a full all-to-all exchange would result in
    /// multiple copies of the same progress messages (but aggregated variously) arriving at
    /// arbitrary times.
    fn local(&self) -> bool { true }

    /// The number of inputs.
    fn inputs(&self) -> usize;
    /// The number of outputs.
    fn outputs(&self) -> usize;

    /// Fetches summary information about internal structure of the operator.
    ///
    /// Each operator must summarize its internal structure by a map from pairs `(input, output)`
    /// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
    /// be transformed to timestamps on any of its outputs.
    ///
    /// Each operator must also indicate whether it initially holds any capabilities on any of its
    /// outputs, so that the parent operator can properly initialize its progress information.
    ///
    /// The default behavior is to indicate that timestamps on any input can emerge unchanged on
    /// any output, and no initial capabilities are held.
    fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Vec<ChangeBatch<T>>) {
        (vec![vec![Antichain::from_elem(Default::default()); self.outputs()]; self.inputs()],
         vec![ChangeBatch::new(); self.outputs()])
    }

    /// Presents summary information about the external structure around the operator.
    ///
    /// Each operator exists in the context of a parent scope, and the edges and other operators it
    /// hosts represent paths messages may take from this operators outputs back to its inputs. For
    /// an operator to correctly understand the implications of local progress statements, it must
    /// understand how messages it produces may eventually return to its inputs.
    ///
    /// The parent scope must also provide initial capabilities for each of the inputs, reflecting
    /// work elsewhere in the timely computation. Note: it is not clear whether the parent must not
    /// include capabilities expressed by the operator itself. It seems possible to exclude such
    /// capabilities, if it would help the operator, but the operator should not yet rely on any
    /// specific behavior.
    fn set_external_summary(&mut self, _summaries: Vec<Vec<Antichain<T::Summary>>>, _frontier: &mut [ChangeBatch<T>]) { }

    /// Reports a summary of progress statements external to the operator and its peer group.
    ///
    /// This report summarizes *all* of the external world, including updates issued by the operator
    /// itself. This is important, and means that there is an ordering constraint that needs to be
    /// enforced by the operator: before reporting any summarized progress to its parent, a child
    /// must install the non-summarized parts (interal messages, capabilities) locally. Otherwise,
    /// the child may learn about external "progress" corresponding to its own actions, and without
    /// noting the consequences of those actions, will be in a bit of a pickle.
    ///
    /// Note: Callee is expected to consume the contents of _external to indicate acknowledgement.
    fn push_external_progress(&mut self, external: &mut [ChangeBatch<T>]) {
        // default implementation just drains the external updates
        for updates in external.iter_mut() {
            updates.clear();
        }
    }

    /// Retrieves a summary of progress statements internal to the operator.
    ///
    /// Returns a bool indicating if there is any unreported work remaining (e.g. work that doesn't
    /// project on an output).
    ///
    /// Note: not "internal to the operator and its peer group". The operator instance should only
    /// report progress performed by its own instance. The parent scope will figure out what to do
    /// with this information (mostly likely exchange it with its peers). There does seem to be the
    /// opportunity to optimize this, but it may complicate the life of the parent to know which of
    /// its children are reporting partial information and which are complete.
    fn pull_internal_progress(&mut self, consumed: &mut [ChangeBatch<T>],          // to populate
                                         internal: &mut [ChangeBatch<T>],          // to populate
                                         produced: &mut [ChangeBatch<T>]) -> bool; // to populate

    /// A descripitive name for the operator
    fn name(&self) -> String;

    /// Indicates of whether the operator requires `push_external_progress` information or not.
    fn notify_me(&self) -> bool { true }
}