1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
//! Core type for communicating a collection of `D: Data` records.
//!
//! `Message<D>` is meant to be treated as a `Vec<D>`, with the caveat that it may wrap either
//! typed `Vec<D>` data or binary `Vec<u8>` data that have not yet been deserialized. The type
//! implements `Deref` and `DerefMut` with `Target = Vec<D>`, whose implementations accommodate
//! the possibly serialized representation.
use timely_communication::{Serialize, Push};
use std::ops::{Deref, DerefMut};
use abomonation::{Abomonation, encode, decode, measure};

/// A serializable representation of timestamped data.
#[derive(Clone)]
pub struct Message<T, D> {
    /// The timestamp associated with the message.
    pub time: T,
    /// The data in the message.
    pub data: Content<D>,
    /// The source worker.
    pub from: usize,
    /// A sequence number for this worker-to-worker stream.
    pub seq: usize,
}

impl<T, D> Message<T, D> {
    /// Allocates a new message from a time, content, source worker id, and sequence number.
    #[inline]
    pub fn new(time: T, data: Content<D>, from: usize, seq: usize) -> Message<T, D> {
        Message {
            time: time,
            data: data,
            from: from,
            seq: seq,
        }
    }
}

// Implementation required to get different behavior out of communication fabric.
impl<T: Abomonation+Clone, D: Abomonation> Serialize for Message<T, D> {
    #[inline]
    fn into_bytes(&mut self, bytes: &mut Vec<u8>) {

        // Reserve the minimal number of bytes to prevent the need to resize.
        let bytes_needed = measure(&self.time) + measure(&self.from) + measure(&self.seq) + measure(self.data.deref());
        bytes.reserve(bytes_needed);

        // Almost like serializing `self`, except `self.data` is special.
        unsafe { encode(&self.time, bytes).unwrap(); }
        unsafe { encode(&self.from, bytes).unwrap(); }
        unsafe { encode(&self.seq, bytes).unwrap(); }
        let vec: &Vec<D> = self.data.deref();
        unsafe { encode(vec, bytes).unwrap(); }
    }
    #[inline]
    fn from_bytes(bytes: &mut Vec<u8>) -> Self {
        // This method *steals* `bytes` and avoids allocation and copying.
        let mut bytes = ::std::mem::replace(bytes, Vec::new());
        let x_len = bytes.len();
        let (time, from, seq, offset) = {
            let (t,r) = unsafe { decode::<T>(&mut bytes) }.unwrap();
            let (&f,r) = unsafe { decode::<usize>(r) }.unwrap();
            let (&s,r) = unsafe { decode::<usize>(r) }.unwrap();
            let o = x_len - r.len();
            ((*t).clone(), f, s, o)
        };

        // The call to `decode` should mean we can freely dereference.
        let length = unsafe { decode::<Vec<D>>(&mut bytes[offset..]) }.unwrap().0.len();
        Message::new(time, Content::Bytes(bytes, offset, length), from, seq)
    }
}

/// A batch of data, represented either as serialized bytes or typed Rust objects.
#[derive(Clone)]
pub enum Content<D> {
    /// A serialized representation of data.
    ///
    /// This representation may be efficiently observed as shared references, 
    /// but may only more expensively be converted into typed data.
    Bytes(Vec<u8>, usize, usize),
    /// Typed data, which may be efficiently mutated or claimed for ownership.
    Typed(Vec<D>),
}

// ALLOC : This Drop implementation gets *very* angry if we drop allocated data.
// ALLOC : It probably shouldn't be used in practice, but should help track down who is being
// ALLOC : bad about respecting allocated memory.
// impl<D> Drop for Message<D> {
//     match self.contents {
//         Content::Bytes(bytes, _, _) => { assert!(bytes.capacity() == 0); }
//         Content::Typed(typed) => { assert!(typed.capacity() == 0); }
//     }
// }

impl<D> Content<D> {
    /// Gives ownership of the content, leaving an empty vector behind.
    pub fn take(&mut self) -> Content<D> {
        ::std::mem::replace(self, Content::Typed(Vec::new()))
    }

    /// Default number of elements in a typed allocated message. This could vary as a function of
    /// `std::mem::size_of::<D>()`, so is left as a method rather than a constant.
    #[inline]
    pub fn default_length() -> usize { 1024 }

    /// The length of the underlying typed vector.
    ///
    /// The length is tracked without needing to deserialize the data, so that this method can be
    /// called even for `D` that do not implement `Serializable`.
    #[inline]
    pub fn len(&self) -> usize {
        match *self {
            Content::Bytes(_, _, length) => length,
            Content::Typed(ref data) => data.len(),
        }
    }

    /// Constructs a `Message` from typed data, replacing its argument with `Vec::new()`.
    #[inline]
    pub fn from_typed(typed: &mut Vec<D>) -> Content<D> {
        Content::Typed(::std::mem::replace(typed, Vec::new()))
    }

    /// Returns the typed vector, cleared, or a Vec::new() if the data are binary (and drops them
    /// on the floor, I guess! Ouch.
    /// ALLOC : dropping of binary data. likely called only by persons who pushed typed data on,
    /// ALLOC : so perhaps not all that common. Could put a panic! here just for fun! :D
    /// ALLOC : casual dropping of contents of `data`, which might have allocated memory.
    #[inline]
    pub fn into_typed(self) -> Vec<D> {
        match self {
            Content::Bytes(_,_,_) => Vec::new(),
            Content::Typed(mut data) => { data.clear(); data },
        }
    }

    /// Pushes `buffer` into `pusher`, ensuring that `buffer` remains valid once returned.
    #[inline(always)]
    pub fn push_at<T, P: Push<(T, Content<D>)>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {

        let data = Content::from_typed(buffer);
        let mut message = Some((time, data));

        pusher.push(&mut message);

        if let Some((_, Content::Typed(mut typed))) = message {
            typed.clear();
            *buffer = typed;
        }
        else {
            // println!("re-allocating (nothing returned)");
            *buffer = Vec::with_capacity(Content::<D>::default_length());
        }

        // TODO : Assert failing, but not sure if is bug when deser can make arbitrary lengths
        // TODO : in clone. Revisit!
        // assert!(buffer.capacity() == Content::<D>::default_length());
        if buffer.capacity() != Content::<D>::default_length() {
            // println!("re-allocating (wrong size)");
            *buffer = Vec::with_capacity(Content::<D>::default_length());
        }
    }
}

impl<D: Clone+Abomonation> Content<D> {
    /// Swaps the contents with another vector.
    ///
    /// This method is a convenient way to take ownership of the underlying data without
    /// needing to import the `DerefMut` trait and write horrible gunk.
    #[inline]
    pub fn replace_with(&mut self, other: Vec<D>) -> Vec<D> {
        ::std::mem::replace(self.deref_mut(), other)
    }
}


impl<D: Abomonation> Deref for Content<D> {
    type Target = Vec<D>;
    #[inline]
    fn deref(&self) -> &Vec<D> {
        match *self {
            Content::Bytes(ref bytes, offset, _length) => {
                // verify wasn't actually safe, it turns out...
                unsafe { ::std::mem::transmute(bytes.get_unchecked(offset)) }
            },
            Content::Typed(ref data) => data,
        }
    }
}

// TODO : Rather than .clone() the decoded data, we should try and re-rig serialization so that the
// TODO : underlying byte array can just be handed to Vec::from_raw_parts, cloning any owned data.
// TODO : I think we would need to make sure that the byte array had the right alignment, so that
// TODO : when the Vec is eventually dropped we don't de-allocate the wrong number of bytes.
// TODO : This requires mucking with the Abomonation code, as it doesn't currently let you step in
// TODO : and skip copying the 24 byte Vec struct first. We'd also have to bake in the typed length
// TODO : somewhere outside of this serialized hunk of data.
impl<D: Clone+Abomonation> DerefMut for Content<D> {
    #[inline]
    fn deref_mut(&mut self) -> &mut Vec<D> {
        let value = if let Content::Bytes(ref mut bytes, offset, _length) = *self {
            let data: &Vec<D> = unsafe { ::std::mem::transmute(bytes.get_unchecked(offset)) };
            // let (data, _) = verify::<Vec<D>>(&bytes[offset..]).unwrap();
            // ALLOC : clone() will allocate a Vec<D> and maybe more.
            Some(Content::Typed((*data).clone()))
        }
        else { None };

        if let Some(contents) = value {
            *self = contents;
        }

        if let Content::Typed(ref mut data) = *self {
            data
        }
        else { unreachable!() }
    }
}