1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496
//! Manages pointstamp reachability within a graph. //! //! Timely dataflow is concerned with understanding and communicating the potential //! for capabilites to reach nodes in a directed graph, by following paths through //! the graph (along edges and through nodes). This module contains one abstraction //! for managing this information. //! //! #Examples //! //! ```rust //! use timely::progress::frontier::Antichain; //! use timely::progress::nested::subgraph::{Source, Target}; //! use timely::progress::nested::reachability::{Builder, Tracker}; //! //! // allocate a new empty topology builder. //! let mut builder = Builder::<usize>::new(); //! //! // Each node with one input connected to one output. //! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); //! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); //! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); //! //! // Connect nodes in sequence, looping around to the first from the last. //! builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} ); //! builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} ); //! builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} ); //! //! // Construct a reachability tracker. //! let mut tracker = Tracker::allocate_from(builder.summarize()); //! //! // Introduce a pointstamp at the output of the first node. //! tracker.update_source(Source { index: 0, port: 0}, 17, 1); //! //! // Propagate changes; until this call updates are simply buffered. //! tracker.propagate_all(); //! //! // Propagated changes should have a single element, incremented for node zero. //! assert_eq!(tracker.pushed_mut(0)[0].drain().collect::<Vec<_>>(), vec![(18, 1)]); //! assert_eq!(tracker.pushed_mut(1)[0].drain().collect::<Vec<_>>(), vec![(17, 1)]); //! assert_eq!(tracker.pushed_mut(2)[0].drain().collect::<Vec<_>>(), vec![(17, 1)]); //! ``` use progress::Timestamp; use progress::nested::{Source, Target}; use progress::ChangeBatch; use progress::frontier::{Antichain, MutableAntichain}; use progress::timestamp::PathSummary; use order::PartialOrder; /// A topology builder, which can summarize reachability along paths. /// /// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles /// a static summary of the minimal actions a timestamp must endure going from any /// input or output port to a destination input port. /// /// A graph is provides as (i) several indexed nodes, each with some number of input /// and output ports, and each with a summary of the internal paths connecting each /// input to each output, and (ii) a set of edges connecting output ports to input /// ports. Edges do not adjust timestamps; only nodes do this. /// /// The resulting summary describes, for each origin port in the graph and destination /// input port, a set of incomparable path summaries, each describing what happens to /// a timestamp as it moves along the path. There may be multiple summaries for each /// part of origin and destination due to the fact that the actions on timestamps may /// not be totally ordered (e.g., "increment the timestamp" and "take the maximum of /// the timestamp and seven"). /// /// #Examples /// /// ```rust /// use timely::progress::frontier::Antichain; /// use timely::progress::nested::subgraph::{Source, Target}; /// use timely::progress::nested::reachability::Builder; /// /// // allocate a new empty topology builder. /// let mut builder = Builder::<usize>::new(); /// /// // Each node with one input connected to one output. /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); /// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); /// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); /// /// // Connect nodes in sequence, looping around to the first from the last. /// builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} ); /// builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} ); /// builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} ); /// /// // Summarize reachability information. /// let summary = builder.summarize(); /// ``` #[derive(Clone, Debug)] pub struct Builder<T: Timestamp> { /// Internal connections within hosted operators. /// /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. /// Indexed by operator index then output port. edges: Vec<Vec<Vec<Target>>>, /// Numbers of inputs and outputs for each node. shape: Vec<(usize, usize)>, } impl<T: Timestamp> Builder<T> { /// Create a new empty topology builder. pub fn new() -> Self { Builder { nodes: Vec::new(), edges: Vec::new(), shape: Vec::new(), } } /// Add links internal to operators. /// /// This method overwrites any existing summary, instead of anything more sophisticated. pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) { // Assert that all summaries exist. debug_assert_eq!(inputs, summary.len()); for x in summary.iter() { debug_assert_eq!(outputs, x.len()); } while self.nodes.len() <= index { self.nodes.push(Vec::new()); self.edges.push(Vec::new()); self.shape.push((0, 0)); } self.nodes[index] = summary; if self.edges[index].len() != outputs { self.edges[index] = vec![Vec::new(); outputs]; } self.shape[index] = (inputs, outputs); } /// Add links between operators. /// /// This method does not check that the associated nodes and ports exist. References to /// missing nodes or ports are discovered in `build`. pub fn add_edge(&mut self, source: Source, target: Target) { // Assert that the edge is between existing ports. debug_assert!(source.port < self.shape[source.index].1); debug_assert!(target.port < self.shape[target.index].0); self.edges[source.index][source.port].push(target); } /// Compiles the current nodes and edges into immutable path summaries. /// /// This method has the opportunity to perform some error checking that the path summaries /// are valid, including references to undefined nodes and ports, as well as self-loops with /// default summaries (a serious liveness issue). pub fn summarize(&mut self) -> Summary<T> { // We maintain a list of new ((source, target), path_summary) entries whose implications // have not yet been fully explored. While such entries exist, we consider the next and // explore its implications by considering all incident target-source' connections (from // `self.nodes`) followed by all source'-target' connections (from `self.edges`). This may // yield ((source, target'), path_summary) entries, and we enqueue any new ones in our list. let mut work = ::std::collections::VecDeque::<((Source, Target), T::Summary)>::new(); // Initialize `work` with all edges in the graph, each with a `Default::default()` summary. for index in 0 .. self.edges.len() { for port in 0 .. self.edges[index].len() { for &target in &self.edges[index][port] { work.push_back(((Source { index: index, port: port}, target), Default::default())); } } } // Prepare space for path summaries. let mut source_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>> = Vec::new(); let mut target_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>> = Vec::new(); for &(inputs, outputs) in self.shape.iter() { source_target.push(vec![Vec::new(); outputs]); target_target.push(vec![Vec::new(); inputs]); } // Establish all source-target path summaries by fixed-point computation. while let Some(((source, target), summary)) = work.pop_front() { // try to add the summary, and if it comes back as "novel" we should explore its two-hop connections. if add_summary(&mut source_target[source.index][source.port], target, summary.clone()) { for (new_source_port, internal_summaries) in self.nodes[target.index][target.port].iter().enumerate() { for internal_summary in internal_summaries.elements() { if let Some(new_summary) = summary.followed_by(internal_summary) { for &new_target in self.edges[target.index][new_source_port].iter() { work.push_back(((source, new_target), new_summary.clone())); } } } } } } // Extend source-target path summaries by one target'-source connection, to yield all // target'-target path summaries. This computes summaries along non-empty paths, so that // we can test for trivial summaries along non-trivial paths. for index in 0 .. self.nodes.len() { for input_port in 0 .. self.nodes[index].len() { // for each output port, consider source-target summaries. for (output_port, internal_summaries) in self.nodes[index][input_port].iter().enumerate() { for internal_summary in internal_summaries.elements() { for &(target, ref new_summaries) in source_target[index][output_port].iter() { for new_summary in new_summaries.elements() { if let Some(summary) = internal_summary.followed_by(new_summary) { add_summary(&mut target_target[index][input_port], target, summary); } } } } } } } // Test for trivial summaries along self-loops. #[cfg(debug_assertions)] { for node in 0 .. target_target.len() { for port in 0 .. target_target[node].len() { let this_target = Target { index: node, port: port }; for &(ref target, ref summary) in target_target[node][port].iter() { if target == &this_target && summary.less_equal(&Default::default()) { panic!("Default summary found along self-loop: {:?}", target); } } } } } // Incorporate trivial self-loops, as changes at a target do apply to the target. for index in 0 .. self.nodes.len() { for input_port in 0 .. self.nodes[index].len() { add_summary( &mut target_target[index][input_port], Target { index: index, port: input_port }, Default::default(), ); } } Summary { source_target, target_target, } } } /// A summary of minimal path summaries in a timely dataflow graph. /// /// A `Summary` instance records a compiled representation of path summaries along paths /// in a timely dataflow graph, mostly commonly constructed by a `reachability::Builder`. #[derive(Clone)] pub struct Summary<T: Timestamp> { // TODO: As all of this information is static, we should be able to flatten it into // fewer allocations, reducing the size and potentially the cost of traversing // the summaries. The access patterns appear to be highly sequential, and look // like straight swings through nodes and ports (iterating on summaries, once // for each update to process). /// Compiled source-to-target reachability. /// /// Entry `source_target[node][port]` lists pairs of target and summaries that can be /// reached from the (node, port) output port. pub source_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>>, /// Compiled target-to-target reachability. /// /// Entry `target_target[node][port]` lists pairs of target and summaries that can be /// reached from the (node, port) input port. pub target_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>>, } /// An interactive tracker of propagated reachability information. /// /// A `Tracker` tracks, for a fixed graph topology, the consequences of /// pointstamp changes at various node input and output ports. These changes may /// alter the potential pointstamps that could arrive at downstream input ports. /// /// A `Tracker` instance is constructed from a reachability summary, by /// way of its `allocate_from` method. With a fixed topology, users can interactively /// call `update_target` and `update_source` to change observed pointstamp counts /// at node inputs and outputs, respectively. These changes are buffered until a /// user invokes either `propagate_all` or `propagate_node`, which consume buffered /// changes propagate their consequences along the graph to any other port that /// can be reached. These changes can be read for each node using `pushed_mut`. /// /// #Examples /// /// ```rust /// use timely::progress::frontier::Antichain; /// use timely::progress::nested::subgraph::{Source, Target}; /// use timely::progress::nested::reachability::{Builder, Tracker}; /// /// // allocate a new empty topology builder. /// let mut builder = Builder::<usize>::new(); /// /// // Each node with one input connected to one output. /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); /// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]); /// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]); /// /// // Connect nodes in sequence, looping around to the first from the last. /// builder.add_edge(Source { index: 0, port: 0}, Target { index: 1, port: 0} ); /// builder.add_edge(Source { index: 1, port: 0}, Target { index: 2, port: 0} ); /// builder.add_edge(Source { index: 2, port: 0}, Target { index: 0, port: 0} ); /// /// // Construct a reachability tracker. /// let mut tracker = Tracker::allocate_from(builder.summarize()); /// /// // Introduce a pointstamp at the output of the first node. /// tracker.update_source(Source { index: 0, port: 0}, 17, 1); /// /// // Propagate changes; until this call updates are simply buffered. /// tracker.propagate_all(); /// /// // Propagated changes should have a single element, incremented for node zero. /// assert_eq!(tracker.pushed_mut(0)[0].drain().collect::<Vec<_>>(), vec![(18, 1)]); /// assert_eq!(tracker.pushed_mut(1)[0].drain().collect::<Vec<_>>(), vec![(17, 1)]); /// assert_eq!(tracker.pushed_mut(2)[0].drain().collect::<Vec<_>>(), vec![(17, 1)]); /// ``` #[derive(Default, Debug)] pub struct Tracker<T:Timestamp> { // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`). // It seems we should be able to flatten most of these so that there are a few allocations // independent of the numbers of nodes and ports and such. sources: Vec<Vec<MutableAntichain<T>>>, targets: Vec<Vec<MutableAntichain<T>>>, /// Buffers of consequent propagated changes. pusheds: Vec<Vec<ChangeBatch<T>>>, /// Compiled reachability along edges and through internal connections. source_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>>, target_target: Vec<Vec<Vec<(Target, Antichain<T::Summary>)>>>, } impl<T:Timestamp> Tracker<T> { /// Updates the count for a time at a target. #[inline] pub fn update_target(&mut self, target: Target, time: T, value: i64) { self.targets[target.index][target.port].update_dirty(time, value); } /// Updates the count for a time at a source. #[inline] pub fn update_source(&mut self, source: Source, time: T, value: i64) { self.sources[source.index][source.port].update_dirty(time, value); } /// Returns references to per-node state describing input and output frontiers, /// and any pending updates to propagated consequences. pub fn node_state(&mut self, index: usize) -> (&[MutableAntichain<T>], &[MutableAntichain<T>], &mut [ChangeBatch<T>]) { (&self.targets[index], &self.sources[index], &mut self.pusheds[index][..]) } /// Reference to the target mutable antichains. pub fn target(&mut self, index: usize) -> &mut [MutableAntichain<T>] { &mut self.targets[index] } /// Reference to the source mutable antichains. pub fn source(&mut self, index: usize) -> &mut [MutableAntichain<T>] { &mut self.sources[index] } /// Clears the pointstamp counter. pub fn clear(&mut self) { for vec in &mut self.sources { for map in vec.iter_mut() { map.clear(); } } for vec in &mut self.targets { for map in vec.iter_mut() { map.clear(); } } for vec in &mut self.pusheds { for map in vec.iter_mut() { map.clear(); } } } /// pub fn is_empty(&mut self) -> bool { self.pusheds.iter_mut().all(|x| x.iter_mut().all(|y| y.is_empty())) } /// Returns true if any source or target is non-empty. pub fn tracking_anything(&mut self) -> bool { self.sources.iter_mut().any(|x| x.iter_mut().any(|y| !y.is_empty())) || self.targets.iter_mut().any(|x| x.iter_mut().any(|y| !y.is_empty())) } /// Allocate a new `Tracker` using the shape from `summaries`. pub fn allocate_from(summary: Summary<T>) -> Self { let source_target = summary.source_target; let target_target = summary.target_target; debug_assert_eq!(source_target.len(), target_target.len()); let mut sources = Vec::with_capacity(source_target.len()); let mut targets = Vec::with_capacity(target_target.len()); let mut pusheds = Vec::with_capacity(target_target.len()); // Allocate buffer space for each input and input port. for index in 0 .. source_target.len() { let source_count = source_target[index].len(); sources.push(vec![MutableAntichain::new(); source_count]); } // Allocate buffer space for each output and output port. for index in 0 .. target_target.len() { let target_count = target_target[index].len(); targets.push(vec![MutableAntichain::new(); target_count]); pusheds.push(vec![ChangeBatch::new(); target_count]); } Tracker { sources, targets, pusheds, source_target, target_target, } } /// Propagates updates from an indicated node. /// /// This method is potentially useful for propagating the consequences of a single /// node invocation, to make the results available immediately. pub fn propagate_node(&mut self, index: usize) { // Propagate changes at each input (target). for input in 0..self.targets[index].len() { let target_target = &self.target_target[index][input]; let pusheds = &mut self.pusheds; self.targets[index][input].update_iter_and(None, |time, value| { for &(target, ref antichain) in target_target.iter() { let pusheds = &mut pusheds[target.index][target.port]; for summary in antichain.elements().iter() { if let Some(new_time) = summary.results_in(&time) { pusheds.update(new_time, value); } } } }); } // Propagate changes at each output (source). for output in 0..self.sources[index].len() { let source_target = &self.source_target[index][output]; let pusheds = &mut self.pusheds; self.sources[index][output].update_iter_and(None, |time, value| { for &(target, ref antichain) in source_target.iter() { let pusheds = &mut pusheds[target.index][target.port]; for summary in antichain.elements().iter() { if let Some(new_time) = summary.results_in(&time) { pusheds.update(new_time, value); } } } }); } } /// Propagates all updates made to sources and targets. pub fn propagate_all(&mut self) { for index in 0..self.targets.len() { self.propagate_node(index); } } /// Provides access to pushed changes for a node. /// /// The caller may read the results or consume the results, as appropriate. The method /// itself does not clear the buffer, so pushed values will stay in place until they are /// consumed by some caller. #[inline(always)] pub fn pushed_mut(&mut self, node: usize) -> &mut [ChangeBatch<T>] { &mut self.pusheds[node][..] } } /// Adds the path summary `summary` to `target` and returns true iff a change occurred. fn add_summary<S: PartialOrder+Eq>(vector: &mut Vec<(Target, Antichain<S>)>, target: Target, summary: S) -> bool { for &mut (ref t, ref mut antichain) in vector.iter_mut() { if target.eq(t) { return antichain.insert(summary); } } vector.push((target, Antichain::from_elem(summary))); true }