1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use std::cell::RefCell;
use progress::{Timestamp, Operate, SubgraphBuilder};
use progress::nested::{Source, Target};
use progress::nested::product::Product;
use timely_communication::{Allocate, Data};
use {Push, Pull};
use logging::Logger;
use super::{ScopeParent, Scope};
pub struct Child<'a, G: ScopeParent, T: Timestamp> {
pub subgraph: &'a RefCell<SubgraphBuilder<G::Timestamp, T>>,
pub parent: G,
pub logging: Logger,
}
impl<'a, G: ScopeParent, T: Timestamp> Child<'a, G, T> {
pub fn index(&self) -> usize { self.parent.index() }
pub fn peers(&self) -> usize { self.parent.peers() }
}
impl<'a, G: ScopeParent, T: Timestamp> ScopeParent for Child<'a, G, T> {
type Timestamp = Product<G::Timestamp, T>;
fn new_identifier(&mut self) -> usize {
self.parent.new_identifier()
}
}
impl<'a, G: ScopeParent, T: Timestamp> Scope for Child<'a, G, T> {
fn name(&self) -> String { self.subgraph.borrow().name.clone() }
fn addr(&self) -> Vec<usize> { self.subgraph.borrow().path.clone() }
fn add_edge(&self, source: Source, target: Target) {
self.subgraph.borrow_mut().connect(source, target);
}
fn add_operator_with_index<SC: Operate<Self::Timestamp>+'static>(&mut self, scope: SC, index: usize) {
let identifier = self.new_identifier();
self.subgraph.borrow_mut().add_child(Box::new(scope), index, identifier);
}
fn allocate_operator_index(&mut self) -> usize {
self.subgraph.borrow_mut().allocate_child_id()
}
#[inline]
fn scoped<T2: Timestamp, R, F: FnOnce(&mut Child<Self, T2>) -> R>(&mut self, func: F) -> R {
let index = self.subgraph.borrow_mut().allocate_child_id();
let path = self.subgraph.borrow().path.clone();
let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone()));
let result = {
let mut builder = Child {
subgraph: &subscope,
parent: self.clone(),
logging: self.logging.clone(),
};
func(&mut builder)
};
let subscope = subscope.into_inner().build(self);
self.add_operator_with_index(subscope, index);
result
}
fn logging(&self) -> Logger {
self.logging.clone()
}
}
impl<'a, G: ScopeParent, T: Timestamp> Allocate for Child<'a, G, T> {
fn index(&self) -> usize { self.parent.index() }
fn peers(&self) -> usize { self.parent.peers() }
fn allocate<D: Data>(&mut self) -> (Vec<Box<Push<D>>>, Box<Pull<D>>, Option<usize>) {
self.parent.allocate()
}
}
impl<'a, G: ScopeParent, T: Timestamp> Clone for Child<'a, G, T> {
fn clone(&self) -> Self { Child { subgraph: self.subgraph, parent: self.parent.clone(), logging: self.logging.clone() }}
}