lz4/
decoder.rs

1use super::liblz4::*;
2use super::size_t;
3use std::io::{Error, ErrorKind, Read, Result};
4use std::ptr;
5
6const BUFFER_SIZE: usize = 32 * 1024;
7
8#[derive(Debug)]
9struct DecoderContext {
10    c: LZ4FDecompressionContext,
11}
12
13#[derive(Debug)]
14pub struct Decoder<R> {
15    c: DecoderContext,
16    r: R,
17    buf: Box<[u8]>,
18    pos: usize,
19    len: usize,
20    next: usize,
21}
22
23impl<R: Read> Decoder<R> {
24    /// Creates a new decoder which reads its input from the given
25    /// input stream. The input stream can be re-acquired by calling
26    /// `finish()`
27    pub fn new(r: R) -> Result<Decoder<R>> {
28        Ok(Decoder {
29            r,
30            c: DecoderContext::new()?,
31            buf: vec![0; BUFFER_SIZE].into_boxed_slice(),
32            pos: BUFFER_SIZE,
33            len: BUFFER_SIZE,
34            // Minimal LZ4 stream size
35            next: 11,
36        })
37    }
38
39    /// Immutable reader reference.
40    pub fn reader(&self) -> &R {
41        &self.r
42    }
43
44    pub fn finish(self) -> (R, Result<()>) {
45        (
46            self.r,
47            match self.next {
48                0 => Ok(()),
49                _ => Err(Error::new(
50                    ErrorKind::Interrupted,
51                    "Finish runned before read end of compressed stream",
52                )),
53            },
54        )
55    }
56}
57
58impl<R: Read> Read for Decoder<R> {
59    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
60        if self.next == 0 || buf.is_empty() {
61            return Ok(0);
62        }
63        let mut dst_offset: usize = 0;
64        while dst_offset == 0 {
65            if self.pos >= self.len {
66                let need = if self.buf.len() < self.next {
67                    self.buf.len()
68                } else {
69                    self.next
70                };
71                self.len = self.r.read(&mut self.buf[0..need])?;
72                // NOTE: we do not exit here if there was nothing read
73                // The lz4 context may still have more bytes to emit.
74
75                self.pos = 0;
76                self.next -= self.len;
77            }
78            while (dst_offset < buf.len()) && ((self.pos < self.len) || self.len == 0) {
79                let mut src_size = (self.len - self.pos) as size_t;
80                let mut dst_size = (buf.len() - dst_offset) as size_t;
81                let len = check_error(unsafe {
82                    LZ4F_decompress(
83                        self.c.c,
84                        buf[dst_offset..].as_mut_ptr(),
85                        &mut dst_size,
86                        self.buf[self.pos..].as_ptr(),
87                        &mut src_size,
88                        ptr::null(),
89                    )
90                })?;
91                self.pos += src_size as usize;
92                dst_offset += dst_size as usize;
93
94                // We need to keep trying to read bytes from the decompressor
95                // until it is no longer emitting them, even after it
96                // has finished reading bytes.
97                if dst_size == 0 && src_size == 0 {
98                    return Ok(dst_offset);
99                }
100
101                if len == 0 {
102                    self.next = 0;
103                    return Ok(dst_offset);
104                } else if self.next < len {
105                    self.next = len;
106                }
107            }
108        }
109        Ok(dst_offset)
110    }
111}
112
113impl DecoderContext {
114    fn new() -> Result<DecoderContext> {
115        let mut context = LZ4FDecompressionContext(ptr::null_mut());
116        check_error(unsafe { LZ4F_createDecompressionContext(&mut context, LZ4F_VERSION) })?;
117        Ok(DecoderContext { c: context })
118    }
119}
120
121impl Drop for DecoderContext {
122    fn drop(&mut self) {
123        unsafe { LZ4F_freeDecompressionContext(self.c) };
124    }
125}
126
127#[cfg(test)]
128mod test {
129    extern crate rand;
130
131    use self::rand::rngs::StdRng;
132    use self::rand::Rng;
133    use super::super::encoder::{Encoder, EncoderBuilder};
134    use super::Decoder;
135    use std::io::{Cursor, Error, ErrorKind, Read, Result, Write};
136
137    const BUFFER_SIZE: usize = 64 * 1024;
138    const END_MARK: [u8; 4] = [0x9f, 0x77, 0x22, 0x71];
139
140    struct ErrorWrapper<R: Read, Rn: Rng> {
141        r: R,
142        rng: Rn,
143    }
144
145    impl<R: Read, Rn: Rng> ErrorWrapper<R, Rn> {
146        fn new(rng: Rn, read: R) -> Self {
147            ErrorWrapper { r: read, rng }
148        }
149    }
150
151    impl<R: Read, Rn: Rng> Read for ErrorWrapper<R, Rn> {
152        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
153            if self.rng.next_u32() & 0x03 == 0 {
154                self.r.read(buf)
155            } else {
156                Err(Error::new(ErrorKind::Other, "Opss..."))
157            }
158        }
159    }
160
161    struct RetryWrapper<R: Read> {
162        r: R,
163    }
164
165    impl<R: Read> RetryWrapper<R> {
166        fn new(read: R) -> Self {
167            RetryWrapper { r: read }
168        }
169    }
170
171    impl<R: Read> Read for RetryWrapper<R> {
172        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
173            loop {
174                match self.r.read(buf) {
175                    Ok(v) => {
176                        return Ok(v);
177                    }
178                    Err(e) => {
179                        if e.kind() == ErrorKind::Other {
180                            continue;
181                        }
182                        return Err(e);
183                    }
184                }
185            }
186        }
187    }
188
189    fn finish_encode<W: Write>(encoder: Encoder<W>) -> W {
190        let (mut buffer, result) = encoder.finish();
191        result.unwrap();
192        buffer.write(&END_MARK).unwrap();
193        buffer
194    }
195
196    fn finish_decode<R: Read>(decoder: Decoder<R>) {
197        let (buffer, result) = decoder.finish();
198        result.unwrap();
199
200        let mut mark = Vec::new();
201        let mut data = Vec::new();
202        mark.write(&END_MARK).unwrap();
203        RetryWrapper::new(buffer).read_to_end(&mut data).unwrap();
204        assert_eq!(mark, data);
205    }
206
207    #[test]
208    fn test_decoder_empty() {
209        let expected: Vec<u8> = Vec::new();
210        let buffer = finish_encode(EncoderBuilder::new().level(1).build(Vec::new()).unwrap());
211
212        let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
213        let mut actual = Vec::new();
214
215        decoder.read_to_end(&mut actual).unwrap();
216        assert_eq!(expected, actual);
217        finish_decode(decoder);
218    }
219
220    #[test]
221    fn test_decoder_smallest() {
222        let expected: Vec<u8> = Vec::new();
223        let mut buffer = b"\x04\x22\x4d\x18\x40\x40\xc0\x00\x00\x00\x00".to_vec();
224        buffer.write(&END_MARK).unwrap();
225
226        let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
227        let mut actual = Vec::new();
228
229        decoder.read_to_end(&mut actual).unwrap();
230        assert_eq!(expected, actual);
231        finish_decode(decoder);
232    }
233
234    #[test]
235    fn test_decoder_smoke() {
236        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
237        let mut expected = Vec::new();
238        expected.write(b"Some data").unwrap();
239        encoder.write(&expected[..4]).unwrap();
240        encoder.write(&expected[4..]).unwrap();
241        let buffer = finish_encode(encoder);
242
243        let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
244        let mut actual = Vec::new();
245
246        decoder.read_to_end(&mut actual).unwrap();
247        assert_eq!(expected, actual);
248        finish_decode(decoder);
249    }
250
251    #[test]
252    fn test_decoder_random() {
253        let mut rnd = random();
254        let expected = random_stream(&mut rnd, 1027 * 1023 * 7);
255        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
256        encoder.write(&expected).unwrap();
257        let encoded = finish_encode(encoder);
258
259        let mut decoder = Decoder::new(Cursor::new(encoded)).unwrap();
260        let mut actual = Vec::new();
261        loop {
262            let mut buffer = [0; BUFFER_SIZE];
263            let size = decoder.read(&mut buffer).unwrap();
264            if size == 0 {
265                break;
266            }
267            actual.write(&buffer[0..size]).unwrap();
268        }
269        assert_eq!(expected, actual);
270        finish_decode(decoder);
271    }
272
273    #[test]
274    fn test_retry_read() {
275        let mut rnd = random();
276        let expected = random_stream(&mut rnd, 1027 * 1023 * 7);
277        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
278        encoder.write(&expected).unwrap();
279        let encoded = finish_encode(encoder);
280
281        let mut decoder =
282            Decoder::new(ErrorWrapper::new(rnd.clone(), Cursor::new(encoded))).unwrap();
283        let mut actual = Vec::new();
284        loop {
285            let mut buffer = [0; BUFFER_SIZE];
286            match decoder.read(&mut buffer) {
287                Ok(size) => {
288                    if size == 0 {
289                        break;
290                    }
291                    actual.write(&buffer[0..size]).unwrap();
292                }
293                Err(_) => {}
294            }
295        }
296
297        assert_eq!(expected, actual);
298        finish_decode(decoder);
299    }
300
301    /// Ensure that we emit the full decompressed stream even if we're
302    /// using a very small output buffer.
303    #[test]
304    fn issue_45() {
305        // create an encoder
306        let mut enc = crate::EncoderBuilder::new().build(Vec::new()).unwrap();
307
308        // write 'a' 100 times to the encoder
309        let text: Vec<u8> = vec!['a' as u8; 100];
310        enc.write_all(&text[..]).unwrap();
311
312        // flush the encoder
313        enc.flush().unwrap();
314
315        // read from the decoder, buf_size bytes at a time
316        for buf_size in [5, 10, 15, 20, 25] {
317            let mut buf = vec![0; buf_size];
318
319            let mut total_bytes_read = 0;
320
321            // create a decoder wrapping the backing buffer
322            let mut dec = crate::Decoder::new(&enc.writer()[..]).unwrap();
323            while let Ok(n) = dec.read(&mut buf[..]) {
324                if n == 0 {
325                    break;
326                }
327
328                total_bytes_read += n;
329            }
330
331            assert_eq!(total_bytes_read, text.len());
332        }
333    }
334
335    fn random() -> StdRng {
336        let seed: [u8; 32] = [
337            157, 164, 190, 237, 231, 103, 60, 22, 197, 108, 51, 176, 30, 170, 155, 21, 163, 249,
338            56, 192, 57, 112, 142, 240, 233, 46, 51, 122, 222, 137, 225, 243,
339        ];
340
341        rand::SeedableRng::from_seed(seed)
342    }
343
344    fn random_stream<R: Rng>(rng: &mut R, size: usize) -> Vec<u8> {
345        (0..size).map(|_| rng.gen()).collect()
346    }
347
348    #[test]
349    fn test_decoder_send() {
350        fn check_send<S: Send>(_: &S) {}
351        let dec = Decoder::new(Cursor::new(Vec::new())).unwrap();
352        check_send(&dec);
353    }
354}