1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
//! Traits and types for replaying captured timely dataflow streams. //! //! A type can be replayed into any timely dataflow scope if it presents as an //! iterator whose `Item` type implements `EventIterator` with the same timestamp. //! Other types can implement the `ReplayInto` trait, but this should be done with //! care, as there is a protocol the replayer follows that must be respected if the //! computation is to make sense. //! //! #Protocol //! //! The stream of events produced by each `EventIterator` implementation must satisfy, //! starting from a default timestamp of `Default::default()` with count 1, //! //! 1. The progress messages may only increment the count for a timestamp if //! the cumulative count for some prior or equal timestamp is positive. //! 2. The data messages map only use a timestamp if the cumulative count for //! some prior or equal timestamp is positive. //! //! Alternately, the sequence of events should, starting from an initial count of 1 //! for the timestamp `Default::default()`, describe decrements to held capabilities //! or the production of capabilities in their future, or messages sent at times in //! the future of held capabilities. //! //! The order is very important here. One can move `Event::Message` events arbitrarily //! earlier in the sequence, and `Event::Progress` events arbitrarily later, but one //! cannot move a `Event::Progress` message that discards a last capability before any //! `Event::Message` that would use that capability. //! //! For an example, the `Operate<T>` implementation for `capture::CaptureOperator<T, D, P>` //! records exactly what data is presented at the operator, both in terms of progress //! messages and data received. //! //! #Notes //! //! Provided no stream of events reports the consumption of capabilities it does not hold, //! any interleaving of the streams of events will still maintain the invariants above. //! This means that each timely dataflow replay operator can replay any number of streams, //! allowing the replay to occur in a timely dataflow computation with more or fewer workers //! than that in which the stream was captured. use ::Data; use dataflow::{Scope, Stream}; use dataflow::channels::pushers::Counter as PushCounter; use dataflow::channels::pushers::buffer::Buffer as PushBuffer; use dataflow::operators::generic::builder_raw::OperatorBuilder; use progress::Timestamp; use super::Event; use super::event::EventIterator; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay<T: Timestamp, D: Data> { /// Replays `self` into the provided scope, as a `Stream<S, D>`. fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, D>; } impl<T: Timestamp, D: Data, I> Replay<T, D> for I where I : IntoIterator, <I as IntoIterator>::Item: EventIterator<T, D>+'static { fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, D>{ let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); let (targets, stream) = builder.new_output(); let mut output = PushBuffer::new(PushCounter::new(targets)); let mut event_streams = self.into_iter().collect::<Vec<_>>(); let mut started = false; builder.build( move |_frontier| { }, move |_consumed, internal, produced| { if !started { // The first thing we do is modify our capabilities to match the number of streams we manage. // This should be a simple change of `self.event_streams.len() - 1`. We only do this once, as // our very first action. internal[0].update(Default::default(), (event_streams.len() as i64) - 1); started = true; } for event_stream in event_streams.iter_mut() { while let Some(event) = event_stream.next() { match *event { Event::Progress(ref vec) => { internal[0].extend(vec.iter().cloned()); }, Event::Messages(ref time, ref data) => { output.session(time).give_iterator(data.iter().cloned()); } } } } output.cease(); output.inner().produced().borrow_mut().drain_into(&mut produced[0]); false } ); stream } }