1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
//! Manages pointstamp reachability within a graph.
//!
//! Timely dataflow is concerned with understanding and communicating the potential
//! for capabilites to reach nodes in a directed graph, by following paths through
//! the graph (along edges and through nodes). This module contains one abstraction
//! for managing this information.
//!
//! #Examples
//!
//! ```rust
//! use timely::progress::frontier::Antichain;
//! use timely::progress::nested::subgraph::{Source, Target};
//! use timely::progress::nested::reachability_neu::{Builder, Tracker};
//!
//! // allocate a new empty topology builder.
//! let mut builder = Builder::<usize>::new();
//! 
//! // Each node with one input connected to one output.
//! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
//! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
//! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]);
//!
//! // Connect nodes in sequence, looping around to the first from the last.
//! builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} );
//! builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} );
//! builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} );
//!
//! // Construct a reachability tracker.
//! let mut tracker = builder.build();
//!
//! // Introduce a pointstamp at the output of the first node.
//! tracker.update_source(Source { index: 0, port: 0}, 17, 1);
//!
//! // Propagate changes; until this call updates are simply buffered.
//! tracker.propagate_all();
//!
//! let mut results = tracker.pushed().drain().collect::<Vec<_>>();
//! results.sort();
//!
//! assert_eq!(results.len(), 3);
//! assert_eq!(results[0], ((Target { index: 0, port: 0 }, 18), 1));
//! assert_eq!(results[1], ((Target { index: 1, port: 0 }, 17), 1));
//! assert_eq!(results[2], ((Target { index: 2, port: 0 }, 17), 1));
//!
//! // Introduce a pointstamp at the output of the first node.
//! tracker.update_source(Source { index: 0, port: 0}, 17, -1);
//!
//! // Propagate changes; until this call updates are simply buffered.
//! tracker.propagate_all();
//!
//! let mut results = tracker.pushed().drain().collect::<Vec<_>>();
//! results.sort();
//!
//! assert_eq!(results.len(), 3);
//! assert_eq!(results[0], ((Target { index: 0, port: 0 }, 18), -1));
//! assert_eq!(results[1], ((Target { index: 1, port: 0 }, 17), -1));
//! assert_eq!(results[2], ((Target { index: 2, port: 0 }, 17), -1));
//! ```

use std::collections::BinaryHeap;

use progress::Timestamp;
use progress::nested::{Source, Target};
use progress::ChangeBatch;

use progress::frontier::{Antichain, MutableAntichain};
use progress::timestamp::PathSummary;


/// A topology builder, which can summarize reachability along paths.
///
/// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles
/// a static summary of the minimal actions a timestamp must endure going from any
/// input or output port to a destination input port.
///
/// A graph is provides as (i) several indexed nodes, each with some number of input
/// and output ports, and each with a summary of the internal paths connecting each
/// input to each output, and (ii) a set of edges connecting output ports to input 
/// ports. Edges do not adjust timestamps; only nodes do this.
///
/// The resulting summary describes, for each origin port in the graph and destination
/// input port, a set of incomparable path summaries, each describing what happens to
/// a timestamp as it moves along the path. There may be multiple summaries for each 
/// part of origin and destination due to the fact that the actions on timestamps may
/// not be totally ordered (e.g., "increment the timestamp" and "take the maximum of
/// the timestamp and seven").
///
/// #Examples
///
/// ```rust
/// use timely::progress::frontier::Antichain;
/// use timely::progress::nested::subgraph::{Source, Target};
/// use timely::progress::nested::reachability_neu::Builder;
///
/// // allocate a new empty topology builder.
/// let mut builder = Builder::<usize>::new();
/// 
/// // Each node with one input connected to one output.
/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]);
///
/// // Connect nodes in sequence, looping around to the first from the last.
/// builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} );
/// builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} );
/// builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} );
///
/// // Summarize reachability information.
/// let tracker = builder.build();
/// ```

#[derive(Clone, Debug)]
pub struct Builder<T: Timestamp> {
    /// Internal connections within hosted operators.
    ///
    /// Indexed by operator index, then input port, then output port. This is the
    /// same format returned by `get_internal_summary`, as if we simply appended
    /// all of the summaries for the hosted nodes.
    pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
    /// Direct connections from sources to targets. 
    ///
    /// Edges do not affect timestamps, so we only need to know the connectivity.
    /// Indexed by operator index then output port.
    pub edges: Vec<Vec<Vec<Target>>>,
    /// Numbers of inputs and outputs for each node.
    pub shape: Vec<(usize, usize)>,
}

impl<T: Timestamp> Builder<T> {

    /// Create a new empty topology builder.
    pub fn new() -> Self {
        Builder {
            nodes: Vec::new(),
            edges: Vec::new(),
            shape: Vec::new(),
        }
    }

    /// Add links internal to operators.
    ///
    /// This method overwrites any existing summary, instead of anything more sophisticated.
    pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {
        
        // Assert that all summaries exist.
        debug_assert_eq!(inputs, summary.len());
        for x in summary.iter() { debug_assert_eq!(outputs, x.len()); }

        while self.nodes.len() <= index { 
            self.nodes.push(Vec::new());
            self.edges.push(Vec::new());
            self.shape.push((0, 0));
        }

        self.nodes[index] = summary;
        if self.edges[index].len() != outputs {
            self.edges[index] = vec![Vec::new(); outputs];
        }
        self.shape[index] = (inputs, outputs);
    }

    /// Add links between operators.
    ///
    /// This method does not check that the associated nodes and ports exist. References to
    /// missing nodes or ports are discovered in `build`.
    pub fn add_edge(&mut self, source: Source, target: Target) {

        // Assert that the edge is between existing ports.
        debug_assert!(source.port < self.shape[source.index].1);
        debug_assert!(target.port < self.shape[target.index].0);

        self.edges[source.index][source.port].push(target);
    }

    /// Compiles the current nodes and edges into immutable path summaries.
    ///
    /// This method has the opportunity to perform some error checking that the path summaries
    /// are valid, including references to undefined nodes and ports, as well as self-loops with
    /// default summaries (a serious liveness issue).
    pub fn build(&self) -> Tracker<T> {
        Tracker::allocate_from(self)
    }
}

/// An interactive tracker of propagated reachability information.
///
/// A `Tracker` tracks, for a fixed graph topology, the consequences of
/// pointstamp changes at various node input and output ports. These changes may
/// alter the potential pointstamps that could arrive at downstream input ports.

#[derive(Debug)]
pub struct Tracker<T:Timestamp> {

    // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
    //       It seems we should be able to flatten most of these so that there are a few allocations
    //       independent of the numbers of nodes and ports and such.
    //
    // TODO: We could also change the internal representation to be a graph of targets, using usize
    //       identifiers for each, so that internally we needn't use multiple levels of indirection.
    //       This may make more sense once we commit to topologically ordering the targets.

    /// Each source and target has a mutable antichain to ensure that we track their discrete frontiers,
    /// rather than their multiplicities. We separately track the frontiers resulting from propagated
    /// frontiers, to protect them from transient negativity in inbound target updates.
    sources: Vec<Vec<MutableAntichain<T>>>,
    targets: Vec<Vec<MutableAntichain<T>>>,
    pusheds: Vec<Vec<MutableAntichain<T>>>,

    /// Source and target changes are buffered, which allows us to delay processing until propagation,
    /// and so consolidate updates, but to leap directly to those frontiers that may have changed.
    source_changes: ChangeBatch<(Source, T)>,
    target_changes: ChangeBatch<(Target, T)>,

    /// Worklist of updates to perform, ordered by increasing timestamp and target.
    target_worklist: BinaryHeap<OrderReversed<(T, Target, i64)>>,

    /// Buffer of consequent changes.
    pushed_changes: ChangeBatch<(Target, T)>,

    /// Internal connections within hosted operators.
    ///
    /// Indexed by operator index, then input port, then output port. This is the
    /// same format returned by `get_internal_summary`, as if we simply appended
    /// all of the summaries for the hosted nodes.
    nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
    /// Direct connections from sources to targets. 
    ///
    /// Edges do not affect timestamps, so we only need to know the connectivity.
    /// Indexed by operator index then output port.
    edges: Vec<Vec<Vec<Target>>>,

    /// Compiled reach from each target to targets one hop downstream.
    compiled: Vec<Vec<Vec<(Target, T::Summary)>>>,
}

impl<T:Timestamp> Tracker<T> {

    /// Updates the count for a time at a target.
    #[inline]
    pub fn update_target(&mut self, target: Target, time: T, value: i64) {
        self.target_changes.update((target, time), value);
    }
    /// Updates the count for a time at a source.
    #[inline]
    pub fn update_source(&mut self, source: Source, time: T, value: i64) {
        self.source_changes.update((source, time), value);
    }

    /// Allocate a new `Tracker` using the shape from `summaries`.
    pub fn allocate_from(builder: &Builder<T>) -> Self {

        let mut sources = Vec::with_capacity(builder.shape.len());
        let mut targets = Vec::with_capacity(builder.shape.len());
        let mut pusheds = Vec::with_capacity(builder.shape.len());

        let mut compiled = Vec::with_capacity(builder.shape.len());

        // Allocate buffer space for each input and input port.
        for (node, &(inputs, outputs)) in builder.shape.iter().enumerate() {
            sources.push(vec![MutableAntichain::new(); outputs]);
            targets.push(vec![MutableAntichain::new(); inputs]);
            pusheds.push(vec![MutableAntichain::new(); inputs]);

            let mut compiled_node = vec![Vec::new(); inputs];
            for input in 0 .. inputs {
                for output in 0 .. outputs {
                    for summary in builder.nodes[node][input][output].elements().iter() {
                        for &target in builder.edges[node][output].iter() {
                            compiled_node[input].push((target, summary.clone()));
                        }
                    }
                }
            }
            compiled.push(compiled_node);
        }

        Tracker {
            sources,
            targets,
            pusheds,
            source_changes: ChangeBatch::new(),
            target_changes: ChangeBatch::new(),
            target_worklist: BinaryHeap::new(),
            pushed_changes: ChangeBatch::new(),
            nodes: builder.nodes.clone(),
            edges: builder.edges.clone(),
            compiled,
        }
    }

    /// Propagates all pending updates.
    pub fn propagate_all(&mut self) {

        // Filter each target change through `self.targets`.
        for ((target, time), diff) in self.target_changes.drain() {
            let target_worklist = &mut self.target_worklist;
            self.targets[target.index][target.port].update_iter_and(Some((time, diff)), |time, diff| {
                target_worklist.push(OrderReversed::new((time.clone(), target, diff)))
            })
        }

        // Filter each source change through `self.sources` and then along edges.
        for ((source, time), diff) in self.source_changes.drain() {
            let target_worklist = &mut self.target_worklist;
            let edges = &self.edges[source.index][source.port];
            self.sources[source.index][source.port].update_iter_and(Some((time, diff)), |time, diff| {
                for &target in edges.iter() {
                    target_worklist.push(OrderReversed::new((time.clone(), target, diff)))
                }
            })
        }

        while !self.target_worklist.is_empty() {

            // This iteration we will drain all (target, time) work items.
            let (time, target, mut diff) = self.target_worklist.pop().unwrap().element;

            // Drain any other updates that might have the same time; accumulate difference.
            while self.target_worklist.peek().map(|x| (x.element.0 == time) && (x.element.1 == target)).unwrap_or(false) {
                diff += self.target_worklist.pop().unwrap().element.2;
            }

            // Only act if there is a net change, positive or negative.
            if diff != 0 {

                // Borrow various self fields to appease Rust.
                let pushed_changes = &mut self.pushed_changes;
                let target_worklist = &mut self.target_worklist;
                let _edges = &self.edges[target.index];
                let _nodes = &self.nodes[target.index][target.port];
                let _compiled = &self.compiled[target.index][target.port];

                // Although single-element updates may seem wasteful, they are important for termination.
                self.pusheds[target.index][target.port].update_iter_and(Some((time, diff)), |time, diff| {

                    // Identify the change in the out change list.
                    pushed_changes.update((target, time.clone()), diff);

                    // When a target has its frontier change we should communicate the change to downstream
                    // targets as well. This means traversing the operator to its outputs.
                    //
                    // We have two implementations, first traversing `nodes` and `edges`, and second using
                    // a compiled representation that flattens all these lists (but has redundant calls to
                    // `results_in`).

                    // // Version 1.
                    // for (output, summaries) in _nodes.iter().enumerate() {
                    //     for summary in summaries.elements().iter() {
                    //         if let Some(new_time) = summary.results_in(time) {
                    //             for &new_target in _edges[output].iter() {
                    //                 target_worklist.push(OrderReversed::new((new_time.clone(), new_target, diff)));
                    //             }
                    //         }
                    //     }
                    // }

                    // Version 2.
                    for &(new_target, ref summary) in _compiled.iter() {
                        if let Some(new_time) = summary.results_in(time) {
                            target_worklist.push(OrderReversed::new((new_time.clone(), new_target, diff)));
                        }
                    }

                })
            }

        }
    }

    /// A mutable reference to the pushed results of changes.
    pub fn pushed(&mut self) -> &mut ChangeBatch<(Target, T)> {
        &mut self.pushed_changes
    }
}


#[derive(PartialEq, Eq, Debug)]
struct OrderReversed<T> {
    pub element: T,
}

impl<T> OrderReversed<T> {
    fn new(element: T) -> Self { OrderReversed { element } }
}

impl<T: PartialOrd> PartialOrd for OrderReversed<T> {
    fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
        other.element.partial_cmp(&self.element)
    }
}
impl<T: Ord> Ord for OrderReversed<T> {
    fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
        other.element.cmp(&self.element)
    }
}