1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
use std::rc::Rc;
use std::cell::RefCell;
use dataflow::channels::Content;
use progress::ChangeBatch;
use Pull;
pub struct Counter<T: Ord+Clone+'static, D, P: Pull<(T, Content<D>)>> {
pullable: P,
consumed: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<D>,
}
impl<T:Ord+Clone+'static, D, P: Pull<(T, Content<D>)>> Counter<T, D, P> {
#[inline]
pub fn next(&mut self) -> Option<(&T, &mut Content<D>)> {
if let Some((ref time, ref mut data)) = *self.pullable.pull() {
if data.len() > 0 {
self.consumed.borrow_mut().update(time.clone(), data.len() as i64);
Some((time, data))
}
else { None }
}
else { None }
}
}
impl<T:Ord+Clone+'static, D, P: Pull<(T, Content<D>)>> Counter<T, D, P> {
pub fn new(pullable: P) -> Self {
Counter {
phantom: ::std::marker::PhantomData,
pullable: pullable,
consumed: Rc::new(RefCell::new(ChangeBatch::new())),
}
}
pub fn consumed(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
&self.consumed
}
}