1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use std::sync::{Arc, Mutex};
use std::any::Any;
use std::sync::mpsc::{Sender, Receiver, channel};
use allocator::{Allocate, Thread};
use {Push, Pull};
pub struct Process {
inner: Thread,
index: usize,
peers: usize,
allocated: usize,
channels: Arc<Mutex<Vec<Box<Any+Send>>>>,
}
impl Process {
pub fn inner<'a>(&'a mut self) -> &'a mut Thread { &mut self.inner }
pub fn new_vector(count: usize) -> Vec<Process> {
let channels = Arc::new(Mutex::new(Vec::new()));
(0 .. count).map(|index| Process {
inner: Thread,
index: index,
peers: count,
allocated: 0,
channels: channels.clone(),
}).collect()
}
}
impl Allocate for Process {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Any+Send+'static>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>, Option<usize>) {
let mut channels = self.channels.lock().ok().expect("mutex error?");
if self.allocated == channels.len() {
let mut pushers = Vec::new();
let mut pullers = Vec::new();
for _ in 0..self.peers {
let (s, r): (Sender<T>, Receiver<T>) = channel();
pushers.push(Pusher { target: s });
pullers.push(Puller { source: r, current: None });
}
let mut to_box = Vec::new();
for recv in pullers.into_iter() {
to_box.push(Some((pushers.clone(), recv)));
}
channels.push(Box::new(to_box));
}
if let Some(ref mut vector) = channels[self.allocated].downcast_mut::<(Vec<Option<(Vec<Pusher<T>>, Puller<T>)>>)>() {
if let Some((send, recv)) = vector[self.index].take() {
self.allocated += 1;
let mut temp = Vec::new();
for s in send.into_iter() { temp.push(Box::new(s) as Box<Push<T>>); }
return (temp, Box::new(recv) as Box<Pull<T>>, None)
}
else {
panic!("channel already consumed");
}
}
else {
panic!("failed to correctly cast channel");
}
}
}
struct Pusher<T> {
target: Sender<T>,
}
impl<T> Clone for Pusher<T> {
fn clone(&self) -> Self {
Pusher { target: self.target.clone() }
}
}
impl<T> Push<T> for Pusher<T> {
#[inline] fn push(&mut self, element: &mut Option<T>) {
if let Some(element) = element.take() {
self.target.send(element).unwrap();
}
}
}
struct Puller<T> {
current: Option<T>,
source: Receiver<T>,
}
impl<T> Pull<T> for Puller<T> {
#[inline]
fn pull(&mut self) -> &mut Option<T> {
self.current = self.source.try_recv().ok();
&mut self.current
}
}