1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
//! Methods which describe an operators topology, and the progress it makes. use std::default::Default; use progress::{Timestamp, ChangeBatch, Antichain}; /// Methods for describing an operators topology, and the progress it makes. pub trait Operate<T: Timestamp> { /// Indicates if the operator is strictly local to this worker. /// /// A parent scope must understand whether the progress information returned by the worker /// reflects only this worker's progress, so that it knows whether to send and receive the /// corresponding progress messages to its peers. If the operator is strictly local, it must /// exchange this information, whereas if the operator is itself implemented by the same set /// of workers, the parent scope understands that progress information already reflects the /// aggregate information among the workers. /// /// This is a coarse approximation to refined worker sets. In a future better world, operators /// would explain how their implementations are partitioned, so that a parent scope knows what /// progress information to exchange with which peers. Right now the two choices are either /// "all" or "none", but it could be more detailed. In the more detailed case, this method /// should / could return a pair (index, peers), indicating the group id of the worker out of /// how many groups. This becomes complicated, as a full all-to-all exchange would result in /// multiple copies of the same progress messages (but aggregated variously) arriving at /// arbitrary times. fn local(&self) -> bool { true } /// The number of inputs. fn inputs(&self) -> usize; /// The number of outputs. fn outputs(&self) -> usize; /// Fetches summary information about internal structure of the operator. /// /// Each operator must summarize its internal structure by a map from pairs `(input, output)` /// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may /// be transformed to timestamps on any of its outputs. /// /// Each operator must also indicate whether it initially holds any capabilities on any of its /// outputs, so that the parent operator can properly initialize its progress information. /// /// The default behavior is to indicate that timestamps on any input can emerge unchanged on /// any output, and no initial capabilities are held. fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Vec<ChangeBatch<T>>) { (vec![vec![Antichain::from_elem(Default::default()); self.outputs()]; self.inputs()], vec![ChangeBatch::new(); self.outputs()]) } /// Presents summary information about the external structure around the operator. /// /// Each operator exists in the context of a parent scope, and the edges and other operators it /// hosts represent paths messages may take from this operators outputs back to its inputs. For /// an operator to correctly understand the implications of local progress statements, it must /// understand how messages it produces may eventually return to its inputs. /// /// The parent scope must also provide initial capabilities for each of the inputs, reflecting /// work elsewhere in the timely computation. Note: it is not clear whether the parent must not /// include capabilities expressed by the operator itself. It seems possible to exclude such /// capabilities, if it would help the operator, but the operator should not yet rely on any /// specific behavior. fn set_external_summary(&mut self, _summaries: Vec<Vec<Antichain<T::Summary>>>, _frontier: &mut [ChangeBatch<T>]) { } /// Reports a summary of progress statements external to the operator and its peer group. /// /// This report summarizes *all* of the external world, including updates issued by the operator /// itself. This is important, and means that there is an ordering constraint that needs to be /// enforced by the operator: before reporting any summarized progress to its parent, a child /// must install the non-summarized parts (interal messages, capabilities) locally. Otherwise, /// the child may learn about external "progress" corresponding to its own actions, and without /// noting the consequences of those actions, will be in a bit of a pickle. /// /// Note: Callee is expected to consume the contents of _external to indicate acknowledgement. fn push_external_progress(&mut self, external: &mut [ChangeBatch<T>]) { // default implementation just drains the external updates for updates in external.iter_mut() { updates.clear(); } } /// Retrieves a summary of progress statements internal to the operator. /// /// Returns a bool indicating if there is any unreported work remaining (e.g. work that doesn't /// project on an output). /// /// Note: not "internal to the operator and its peer group". The operator instance should only /// report progress performed by its own instance. The parent scope will figure out what to do /// with this information (mostly likely exchange it with its peers). There does seem to be the /// opportunity to optimize this, but it may complicate the life of the parent to know which of /// its children are reporting partial information and which are complete. fn pull_internal_progress(&mut self, consumed: &mut [ChangeBatch<T>], // to populate internal: &mut [ChangeBatch<T>], // to populate produced: &mut [ChangeBatch<T>]) -> bool; // to populate /// A descripitive name for the operator fn name(&self) -> String; /// Indicates of whether the operator requires `push_external_progress` information or not. fn notify_me(&self) -> bool { true } }