1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use progress::Timestamp;
use progress::ChangeBatch;
use timely_communication::Allocate;
use {Push, Pull};
use logging::Logger;
pub type ProgressVec<T> = Vec<((usize, usize, T), i64)>;
pub type ProgressMsg<T> = (usize, usize, ProgressVec<T>, ProgressVec<T>);
pub struct Progcaster<T:Timestamp> {
pushers: Vec<Box<Push<ProgressMsg<T>>>>,
puller: Box<Pull<ProgressMsg<T>>>,
source: usize,
counter: usize,
addr: Vec<usize>,
comm_channel: Option<usize>,
logging: Logger,
}
impl<T:Timestamp+Send> Progcaster<T> {
pub fn new<A: Allocate>(allocator: &mut A, path: &Vec<usize>, logging: Logger) -> Progcaster<T> {
let (pushers, puller, chan) = allocator.allocate();
logging.when_enabled(|l| l.log(::logging::TimelyEvent::CommChannels(::logging::CommChannelsEvent {
comm_channel: chan,
comm_channel_kind: ::logging::CommChannelKind::Progress,
})));
let worker = allocator.index();
let addr = path.clone();
Progcaster { pushers: pushers, puller: puller, source: worker,
counter: 0, addr: addr, comm_channel: chan,
logging: logging }
}
pub fn send_and_recv(
&mut self,
messages: &mut ChangeBatch<(usize, usize, T)>,
internal: &mut ChangeBatch<(usize, usize, T)>)
{
if self.pushers.len() > 1 {
if !messages.is_empty() || !internal.is_empty() {
self.logging.when_enabled(|l| l.log(::logging::TimelyEvent::Progress(::logging::ProgressEvent {
is_send: true,
source: self.source,
comm_channel: self.comm_channel,
seq_no: self.counter,
addr: self.addr.clone(),
messages: Vec::new(),
internal: Vec::new(),
})));
for pusher in self.pushers.iter_mut() {
pusher.push(&mut Some((self.source, self.counter, messages.clone().into_inner(), internal.clone().into_inner())));
}
self.counter += 1;
messages.clear();
internal.clear();
}
while let Some((ref source, ref counter, ref mut recv_messages, ref mut recv_internal)) = *self.puller.pull() {
let comm_channel = self.comm_channel;
let addr = &mut self.addr;
self.logging.when_enabled(|l| l.log(::logging::TimelyEvent::Progress(::logging::ProgressEvent {
is_send: false,
source: *source,
seq_no: *counter,
comm_channel,
addr: addr.clone(),
messages: Vec::new(),
internal: Vec::new(),
})));
for &(ref update, delta) in recv_messages.iter() {
messages.update(update.clone(), delta);
}
for &(ref update, delta) in recv_internal.iter() {
internal.update(update.clone(), delta);
}
}
}
}
}