1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
//! Manages pointstamp reachability within a graph. //! //! Timely dataflow is concerned with understanding and communicating the potential //! for capabilites to reach nodes in a directed graph, by following paths through //! the graph (along edges and through nodes). This module contains one abstraction //! for managing this information. //! //! #Examples //! //! ```rust //! use timely::progress::frontier::Antichain; //! use timely::progress::nested::subgraph::{Source, Target}; //! use timely::progress::nested::reachability_neu::{Builder, Tracker}; //! //! // allocate a new empty topology builder. //! let mut builder = Builder::<usize>::new(); //! //! // Each node with one input connected to one output. //! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); //! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); //! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); //! //! // Connect nodes in sequence, looping around to the first from the last. //! builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} ); //! builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} ); //! builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} ); //! //! // Construct a reachability tracker. //! let mut tracker = builder.build(); //! //! // Introduce a pointstamp at the output of the first node. //! tracker.update_source(Source { index: 0, port: 0}, 17, 1); //! //! // Propagate changes; until this call updates are simply buffered. //! tracker.propagate_all(); //! //! let mut results = tracker.pushed().drain().collect::<Vec<_>>(); //! results.sort(); //! //! assert_eq!(results.len(), 3); //! assert_eq!(results[0], ((Target { index: 0, port: 0 }, 18), 1)); //! assert_eq!(results[1], ((Target { index: 1, port: 0 }, 17), 1)); //! assert_eq!(results[2], ((Target { index: 2, port: 0 }, 17), 1)); //! //! // Introduce a pointstamp at the output of the first node. //! tracker.update_source(Source { index: 0, port: 0}, 17, -1); //! //! // Propagate changes; until this call updates are simply buffered. //! tracker.propagate_all(); //! //! let mut results = tracker.pushed().drain().collect::<Vec<_>>(); //! results.sort(); //! //! assert_eq!(results.len(), 3); //! assert_eq!(results[0], ((Target { index: 0, port: 0 }, 18), -1)); //! assert_eq!(results[1], ((Target { index: 1, port: 0 }, 17), -1)); //! assert_eq!(results[2], ((Target { index: 2, port: 0 }, 17), -1)); //! ``` use std::collections::BinaryHeap; use progress::Timestamp; use progress::nested::{Source, Target}; use progress::ChangeBatch; use progress::frontier::{Antichain, MutableAntichain}; use progress::timestamp::PathSummary; /// A topology builder, which can summarize reachability along paths. /// /// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles /// a static summary of the minimal actions a timestamp must endure going from any /// input or output port to a destination input port. /// /// A graph is provides as (i) several indexed nodes, each with some number of input /// and output ports, and each with a summary of the internal paths connecting each /// input to each output, and (ii) a set of edges connecting output ports to input /// ports. Edges do not adjust timestamps; only nodes do this. /// /// The resulting summary describes, for each origin port in the graph and destination /// input port, a set of incomparable path summaries, each describing what happens to /// a timestamp as it moves along the path. There may be multiple summaries for each /// part of origin and destination due to the fact that the actions on timestamps may /// not be totally ordered (e.g., "increment the timestamp" and "take the maximum of /// the timestamp and seven"). /// /// #Examples /// /// ```rust /// use timely::progress::frontier::Antichain; /// use timely::progress::nested::subgraph::{Source, Target}; /// use timely::progress::nested::reachability_neu::Builder; /// /// // allocate a new empty topology builder. /// let mut builder = Builder::<usize>::new(); /// /// // Each node with one input connected to one output. /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); /// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); /// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); /// /// // Connect nodes in sequence, looping around to the first from the last. /// builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} ); /// builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} ); /// builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} ); /// /// // Summarize reachability information. /// let tracker = builder.build(); /// ``` #[derive(Clone, Debug)] pub struct Builder<T: Timestamp> { /// Internal connections within hosted operators. /// /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. /// Indexed by operator index then output port. pub edges: Vec<Vec<Vec<Target>>>, /// Numbers of inputs and outputs for each node. pub shape: Vec<(usize, usize)>, } impl<T: Timestamp> Builder<T> { /// Create a new empty topology builder. pub fn new() -> Self { Builder { nodes: Vec::new(), edges: Vec::new(), shape: Vec::new(), } } /// Add links internal to operators. /// /// This method overwrites any existing summary, instead of anything more sophisticated. pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) { // Assert that all summaries exist. debug_assert_eq!(inputs, summary.len()); for x in summary.iter() { debug_assert_eq!(outputs, x.len()); } while self.nodes.len() <= index { self.nodes.push(Vec::new()); self.edges.push(Vec::new()); self.shape.push((0, 0)); } self.nodes[index] = summary; if self.edges[index].len() != outputs { self.edges[index] = vec![Vec::new(); outputs]; } self.shape[index] = (inputs, outputs); } /// Add links between operators. /// /// This method does not check that the associated nodes and ports exist. References to /// missing nodes or ports are discovered in `build`. pub fn add_edge(&mut self, source: Source, target: Target) { // Assert that the edge is between existing ports. debug_assert!(source.port < self.shape[source.index].1); debug_assert!(target.port < self.shape[target.index].0); self.edges[source.index][source.port].push(target); } /// Compiles the current nodes and edges into immutable path summaries. /// /// This method has the opportunity to perform some error checking that the path summaries /// are valid, including references to undefined nodes and ports, as well as self-loops with /// default summaries (a serious liveness issue). pub fn build(&self) -> Tracker<T> { Tracker::allocate_from(self) } } /// An interactive tracker of propagated reachability information. /// /// A `Tracker` tracks, for a fixed graph topology, the consequences of /// pointstamp changes at various node input and output ports. These changes may /// alter the potential pointstamps that could arrive at downstream input ports. #[derive(Debug)] pub struct Tracker<T:Timestamp> { // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`). // It seems we should be able to flatten most of these so that there are a few allocations // independent of the numbers of nodes and ports and such. // // TODO: We could also change the internal representation to be a graph of targets, using usize // identifiers for each, so that internally we needn't use multiple levels of indirection. // This may make more sense once we commit to topologically ordering the targets. /// Each source and target has a mutable antichain to ensure that we track their discrete frontiers, /// rather than their multiplicities. We separately track the frontiers resulting from propagated /// frontiers, to protect them from transient negativity in inbound target updates. sources: Vec<Vec<MutableAntichain<T>>>, targets: Vec<Vec<MutableAntichain<T>>>, pusheds: Vec<Vec<MutableAntichain<T>>>, /// Source and target changes are buffered, which allows us to delay processing until propagation, /// and so consolidate updates, but to leap directly to those frontiers that may have changed. source_changes: ChangeBatch<(Source, T)>, target_changes: ChangeBatch<(Target, T)>, /// Worklist of updates to perform, ordered by increasing timestamp and target. target_worklist: BinaryHeap<OrderReversed<(T, Target, i64)>>, /// Buffer of consequent changes. pushed_changes: ChangeBatch<(Target, T)>, /// Internal connections within hosted operators. /// /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. /// Indexed by operator index then output port. edges: Vec<Vec<Vec<Target>>>, /// Compiled reach from each target to targets one hop downstream. compiled: Vec<Vec<Vec<(Target, T::Summary)>>>, } impl<T:Timestamp> Tracker<T> { /// Updates the count for a time at a target. #[inline] pub fn update_target(&mut self, target: Target, time: T, value: i64) { self.target_changes.update((target, time), value); } /// Updates the count for a time at a source. #[inline] pub fn update_source(&mut self, source: Source, time: T, value: i64) { self.source_changes.update((source, time), value); } /// Allocate a new `Tracker` using the shape from `summaries`. pub fn allocate_from(builder: &Builder<T>) -> Self { let mut sources = Vec::with_capacity(builder.shape.len()); let mut targets = Vec::with_capacity(builder.shape.len()); let mut pusheds = Vec::with_capacity(builder.shape.len()); let mut compiled = Vec::with_capacity(builder.shape.len()); // Allocate buffer space for each input and input port. for (node, &(inputs, outputs)) in builder.shape.iter().enumerate() { sources.push(vec![MutableAntichain::new(); outputs]); targets.push(vec![MutableAntichain::new(); inputs]); pusheds.push(vec![MutableAntichain::new(); inputs]); let mut compiled_node = vec![Vec::new(); inputs]; for input in 0 .. inputs { for output in 0 .. outputs { for summary in builder.nodes[node][input][output].elements().iter() { for &target in builder.edges[node][output].iter() { compiled_node[input].push((target, summary.clone())); } } } } compiled.push(compiled_node); } Tracker { sources, targets, pusheds, source_changes: ChangeBatch::new(), target_changes: ChangeBatch::new(), target_worklist: BinaryHeap::new(), pushed_changes: ChangeBatch::new(), nodes: builder.nodes.clone(), edges: builder.edges.clone(), compiled, } } /// Propagates all pending updates. pub fn propagate_all(&mut self) { // Filter each target change through `self.targets`. for ((target, time), diff) in self.target_changes.drain() { let target_worklist = &mut self.target_worklist; self.targets[target.index][target.port].update_iter_and(Some((time, diff)), |time, diff| { target_worklist.push(OrderReversed::new((time.clone(), target, diff))) }) } // Filter each source change through `self.sources` and then along edges. for ((source, time), diff) in self.source_changes.drain() { let target_worklist = &mut self.target_worklist; let edges = &self.edges[source.index][source.port]; self.sources[source.index][source.port].update_iter_and(Some((time, diff)), |time, diff| { for &target in edges.iter() { target_worklist.push(OrderReversed::new((time.clone(), target, diff))) } }) } while !self.target_worklist.is_empty() { // This iteration we will drain all (target, time) work items. let (time, target, mut diff) = self.target_worklist.pop().unwrap().element; // Drain any other updates that might have the same time; accumulate difference. while self.target_worklist.peek().map(|x| (x.element.0 == time) && (x.element.1 == target)).unwrap_or(false) { diff += self.target_worklist.pop().unwrap().element.2; } // Only act if there is a net change, positive or negative. if diff != 0 { // Borrow various self fields to appease Rust. let pushed_changes = &mut self.pushed_changes; let target_worklist = &mut self.target_worklist; let _edges = &self.edges[target.index]; let _nodes = &self.nodes[target.index][target.port]; let _compiled = &self.compiled[target.index][target.port]; // Although single-element updates may seem wasteful, they are important for termination. self.pusheds[target.index][target.port].update_iter_and(Some((time, diff)), |time, diff| { // Identify the change in the out change list. pushed_changes.update((target, time.clone()), diff); // When a target has its frontier change we should communicate the change to downstream // targets as well. This means traversing the operator to its outputs. // // We have two implementations, first traversing `nodes` and `edges`, and second using // a compiled representation that flattens all these lists (but has redundant calls to // `results_in`). // // Version 1. // for (output, summaries) in _nodes.iter().enumerate() { // for summary in summaries.elements().iter() { // if let Some(new_time) = summary.results_in(time) { // for &new_target in _edges[output].iter() { // target_worklist.push(OrderReversed::new((new_time.clone(), new_target, diff))); // } // } // } // } // Version 2. for &(new_target, ref summary) in _compiled.iter() { if let Some(new_time) = summary.results_in(time) { target_worklist.push(OrderReversed::new((new_time.clone(), new_target, diff))); } } }) } } } /// A mutable reference to the pushed results of changes. pub fn pushed(&mut self) -> &mut ChangeBatch<(Target, T)> { &mut self.pushed_changes } } #[derive(PartialEq, Eq, Debug)] struct OrderReversed<T> { pub element: T, } impl<T> OrderReversed<T> { fn new(element: T) -> Self { OrderReversed { element } } } impl<T: PartialOrd> PartialOrd for OrderReversed<T> { fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> { other.element.partial_cmp(&self.element) } } impl<T: Ord> Ord for OrderReversed<T> { fn cmp(&self, other: &Self) -> ::std::cmp::Ordering { other.element.cmp(&self.element) } }