1use crate::{BoundedBTreeMap, BoundedBTreeSet, BoundedVec, WeakBoundedVec};
19use alloc::vec::Vec;
20use codec::Decode;
21
22mod private {
24 use super::*;
25
26 pub trait StreamIter {
28 type Iterator: core::iter::Iterator;
30
31 fn stream_iter(key: Vec<u8>) -> Self::Iterator;
33 }
34
35 impl<T: codec::Decode> StreamIter for Vec<T> {
36 type Iterator = ScaleContainerStreamIter<T>;
37
38 fn stream_iter(key: Vec<u8>) -> Self::Iterator {
39 ScaleContainerStreamIter::new(key)
40 }
41 }
42
43 impl<T: codec::Decode> StreamIter for alloc::collections::btree_set::BTreeSet<T> {
44 type Iterator = ScaleContainerStreamIter<T>;
45
46 fn stream_iter(key: Vec<u8>) -> Self::Iterator {
47 ScaleContainerStreamIter::new(key)
48 }
49 }
50
51 impl<K: codec::Decode, V: codec::Decode> StreamIter
52 for alloc::collections::btree_map::BTreeMap<K, V>
53 {
54 type Iterator = ScaleContainerStreamIter<(K, V)>;
55
56 fn stream_iter(key: Vec<u8>) -> Self::Iterator {
57 ScaleContainerStreamIter::new(key)
58 }
59 }
60
61 impl<T: codec::Decode, S> StreamIter for BoundedVec<T, S> {
62 type Iterator = ScaleContainerStreamIter<T>;
63
64 fn stream_iter(key: Vec<u8>) -> Self::Iterator {
65 ScaleContainerStreamIter::new(key)
66 }
67 }
68
69 impl<T: codec::Decode, S> StreamIter for WeakBoundedVec<T, S> {
70 type Iterator = ScaleContainerStreamIter<T>;
71
72 fn stream_iter(key: Vec<u8>) -> Self::Iterator {
73 ScaleContainerStreamIter::new(key)
74 }
75 }
76
77 impl<K: codec::Decode, V: codec::Decode, S> StreamIter for BoundedBTreeMap<K, V, S> {
78 type Iterator = ScaleContainerStreamIter<(K, V)>;
79
80 fn stream_iter(key: Vec<u8>) -> Self::Iterator {
81 ScaleContainerStreamIter::new(key)
82 }
83 }
84
85 impl<T: codec::Decode, S> StreamIter for BoundedBTreeSet<T, S> {
86 type Iterator = ScaleContainerStreamIter<T>;
87
88 fn stream_iter(key: Vec<u8>) -> Self::Iterator {
89 ScaleContainerStreamIter::new(key)
90 }
91 }
92}
93
94pub trait StorageStreamIter<T: private::StreamIter> {
104 fn stream_iter() -> T::Iterator;
106}
107
108impl<T: private::StreamIter + codec::FullCodec, StorageValue: super::StorageValue<T>>
109 StorageStreamIter<T> for StorageValue
110{
111 fn stream_iter() -> T::Iterator {
112 T::stream_iter(Self::hashed_key().into())
113 }
114}
115
116pub struct ScaleContainerStreamIter<T> {
126 marker: core::marker::PhantomData<T>,
127 input: StorageInput,
128 length: u32,
129 read: u32,
130}
131
132impl<T> ScaleContainerStreamIter<T> {
133 pub fn new(key: Vec<u8>) -> Self {
139 let mut input = StorageInput::new(key);
140 let length = if input.exists() {
141 match codec::Compact::<u32>::decode(&mut input) {
142 Ok(length) => length.0,
143 Err(e) => {
144 log::error!(
146 target: "runtime::storage",
147 "Corrupted state at `{:?}`: failed to decode element count: {:?}",
148 input.key,
149 e,
150 );
151
152 0
153 },
154 }
155 } else {
156 0
157 };
158
159 Self { marker: core::marker::PhantomData, input, length, read: 0 }
160 }
161
162 pub fn new_try(key: Vec<u8>) -> Result<Self, codec::Error> {
168 let mut input = StorageInput::new(key);
169 let length = if input.exists() { codec::Compact::<u32>::decode(&mut input)?.0 } else { 0 };
170
171 Ok(Self { marker: core::marker::PhantomData, input, length, read: 0 })
172 }
173}
174
175impl<T: codec::Decode> core::iter::Iterator for ScaleContainerStreamIter<T> {
176 type Item = T;
177
178 fn next(&mut self) -> Option<T> {
179 if self.read >= self.length {
180 return None
181 }
182
183 match codec::Decode::decode(&mut self.input) {
184 Ok(r) => {
185 self.read += 1;
186 Some(r)
187 },
188 Err(e) => {
189 log::error!(
190 target: "runtime::storage",
191 "Corrupted state at `{:?}`: failed to decode element {} (out of {} in total): {:?}",
192 self.input.key,
193 self.read,
194 self.length,
195 e,
196 );
197
198 self.read = self.length;
199 None
200 },
201 }
202 }
203
204 fn size_hint(&self) -> (usize, Option<usize>) {
205 let left = (self.length - self.read) as usize;
206
207 (left, Some(left))
208 }
209}
210
211const STORAGE_INPUT_BUFFER_CAPACITY: usize = 2 * 1024;
216
217struct StorageInput {
225 key: Vec<u8>,
226 offset: u32,
227 total_length: u32,
228 exists: bool,
229 buffer: Vec<u8>,
230 buffer_pos: usize,
231}
232
233impl StorageInput {
234 fn new(key: Vec<u8>) -> Self {
238 let mut buffer = alloc::vec![0; STORAGE_INPUT_BUFFER_CAPACITY];
239 unsafe {
240 buffer.set_len(buffer.capacity());
241 }
242
243 let (total_length, exists) =
244 if let Some(total_length) = sp_io::storage::read(&key, &mut buffer, 0) {
245 (total_length, true)
246 } else {
247 (0, false)
248 };
249
250 if (total_length as usize) < buffer.len() {
251 unsafe {
252 buffer.set_len(total_length as usize);
253 }
254 }
255
256 Self { total_length, offset: buffer.len() as u32, key, exists, buffer, buffer_pos: 0 }
257 }
258
259 fn fill_buffer(&mut self) -> Result<(), codec::Error> {
261 self.buffer.copy_within(self.buffer_pos.., 0);
262 let present_bytes = self.buffer.len() - self.buffer_pos;
263 self.buffer_pos = 0;
264
265 unsafe {
266 self.buffer.set_len(self.buffer.capacity());
267 }
268
269 if let Some(length_minus_offset) =
270 sp_io::storage::read(&self.key, &mut self.buffer[present_bytes..], self.offset)
271 {
272 let bytes_read =
273 core::cmp::min(length_minus_offset as usize, self.buffer.len() - present_bytes);
274 let buffer_len = present_bytes + bytes_read;
275 unsafe {
276 self.buffer.set_len(buffer_len);
277 }
278
279 self.ensure_total_length_did_not_change(length_minus_offset)?;
280
281 self.offset += bytes_read as u32;
282
283 Ok(())
284 } else {
285 self.stop_reading();
287
288 Err("Value doesn't exist in the state?".into())
289 }
290 }
291
292 fn exists(&self) -> bool {
294 self.exists
295 }
296
297 #[inline(never)]
302 fn read_big_item(&mut self, into: &mut [u8]) -> Result<(), codec::Error> {
303 let num_cached = self.buffer.len() - self.buffer_pos;
304
305 let (out_already_read, mut out_remaining) = into.split_at_mut(num_cached);
306 out_already_read.copy_from_slice(&self.buffer[self.buffer_pos..]);
307
308 self.buffer_pos = 0;
309 unsafe {
310 self.buffer.set_len(0);
311 }
312
313 if let Some(length_minus_offset) =
314 sp_io::storage::read(&self.key, &mut out_remaining, self.offset)
315 {
316 if (length_minus_offset as usize) < out_remaining.len() {
317 return Err("Not enough data to fill the buffer".into())
318 }
319
320 self.ensure_total_length_did_not_change(length_minus_offset)?;
321
322 self.offset += out_remaining.len() as u32;
323
324 Ok(())
325 } else {
326 self.stop_reading();
328
329 Err("Value doesn't exist in the state?".into())
330 }
331 }
332
333 fn ensure_total_length_did_not_change(
337 &mut self,
338 length_minus_offset: u32,
339 ) -> Result<(), codec::Error> {
340 if self.total_length == self.offset + length_minus_offset {
341 Ok(())
342 } else {
343 self.stop_reading();
345
346 Err("Storage value changed while it is being read!".into())
347 }
348 }
349
350 fn stop_reading(&mut self) {
354 self.offset = self.total_length;
355
356 self.buffer_pos = 0;
357 unsafe {
358 self.buffer.set_len(0);
359 }
360 }
361}
362
363impl codec::Input for StorageInput {
364 fn remaining_len(&mut self) -> Result<Option<usize>, codec::Error> {
365 Ok(Some(self.total_length.saturating_sub(
366 self.offset.saturating_sub((self.buffer.len() - self.buffer_pos) as u32),
367 ) as usize))
368 }
369
370 fn read(&mut self, into: &mut [u8]) -> Result<(), codec::Error> {
371 if self.offset < self.total_length {
373 if into.len() > self.buffer.capacity() {
374 return self.read_big_item(into)
375 } else if self.buffer_pos + into.len() > self.buffer.len() {
376 self.fill_buffer()?;
377 }
378 }
379
380 if into.len() + self.buffer_pos > self.buffer.len() {
383 return Err("Not enough data to fill the buffer".into())
384 }
385
386 let end = self.buffer_pos + into.len();
387 into.copy_from_slice(&self.buffer[self.buffer_pos..end]);
388 self.buffer_pos = end;
389
390 Ok(())
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397 use codec::{Compact, CompactLen, Encode, Input};
398
399 #[crate::storage_alias]
400 pub type TestVecU32 = StorageValue<Test, Vec<u32>>;
401
402 #[crate::storage_alias]
403 pub type TestVecVecU8 = StorageValue<Test, Vec<Vec<u8>>>;
404
405 #[test]
406 fn remaining_len_works() {
407 sp_io::TestExternalities::default().execute_with(|| {
408 let data: Vec<u32> = vec![1, 2, 3, 4, 5];
409 TestVecU32::put(&data);
410
411 let mut input = StorageInput::new(TestVecU32::hashed_key().into());
412 assert_eq!(
413 5 * std::mem::size_of::<u32>() + Compact::<u32>::compact_len(&5) as usize,
414 input.remaining_len().ok().flatten().unwrap()
415 );
416
417 assert_eq!(5, Compact::<u32>::decode(&mut input).unwrap().0);
418 assert_eq!(
419 5 * std::mem::size_of::<u32>(),
420 input.remaining_len().ok().flatten().unwrap()
421 );
422
423 for i in &data {
424 assert_eq!(*i, u32::decode(&mut input).unwrap());
425 assert_eq!(
426 (5 - *i as usize) * std::mem::size_of::<u32>(),
427 input.remaining_len().ok().flatten().unwrap()
428 );
429 }
430
431 let data: Vec<Vec<u8>> = vec![
432 vec![0; 20],
433 vec![1; STORAGE_INPUT_BUFFER_CAPACITY * 2],
434 vec![2; STORAGE_INPUT_BUFFER_CAPACITY * 2],
435 vec![3; 30],
436 vec![4; 30],
437 vec![5; STORAGE_INPUT_BUFFER_CAPACITY * 2],
438 vec![6; 30],
439 ];
440 TestVecVecU8::put(&data);
441
442 let mut input = StorageInput::new(TestVecVecU8::hashed_key().into());
443 let total_data_len = data
444 .iter()
445 .map(|v| v.len() + Compact::<u32>::compact_len(&(v.len() as u32)) as usize)
446 .sum::<usize>();
447 assert_eq!(
448 total_data_len + Compact::<u32>::compact_len(&(data.len() as u32)) as usize,
449 input.remaining_len().ok().flatten().unwrap()
450 );
451
452 assert_eq!(data.len(), Compact::<u32>::decode(&mut input).unwrap().0 as usize);
453 assert_eq!(total_data_len, input.remaining_len().ok().flatten().unwrap());
454
455 let mut remaining_len = total_data_len;
456 for i in data {
457 assert_eq!(i, Vec::<u8>::decode(&mut input).unwrap());
458
459 remaining_len -= i.len() + Compact::<u32>::compact_len(&(i.len() as u32)) as usize;
460
461 assert_eq!(remaining_len, input.remaining_len().ok().flatten().unwrap());
462 }
463 })
464 }
465
466 #[test]
467 fn detects_value_total_length_change() {
468 sp_io::TestExternalities::default().execute_with(|| {
469 let test_data: Vec<Vec<Vec<u8>>> = vec![
470 vec![vec![0; 20], vec![1; STORAGE_INPUT_BUFFER_CAPACITY * 2]],
471 vec![
472 vec![0; STORAGE_INPUT_BUFFER_CAPACITY - 1],
473 vec![1; STORAGE_INPUT_BUFFER_CAPACITY - 1],
474 ],
475 ];
476
477 for data in test_data {
478 TestVecVecU8::put(&data);
479
480 let mut input = StorageInput::new(TestVecVecU8::hashed_key().into());
481
482 Compact::<u32>::decode(&mut input).unwrap();
483 Vec::<u8>::decode(&mut input).unwrap();
484
485 TestVecVecU8::append(vec![1, 2, 3]);
486
487 assert!(Vec::<u8>::decode(&mut input)
488 .unwrap_err()
489 .to_string()
490 .contains("Storage value changed while it is being read"));
491
492 assert!(Vec::<u8>::decode(&mut input)
494 .unwrap_err()
495 .to_string()
496 .contains("Not enough data to fill the buffer"));
497 }
498 })
499 }
500
501 #[test]
502 fn stream_read_test() {
503 sp_io::TestExternalities::default().execute_with(|| {
504 let data: Vec<u32> = vec![1, 2, 3, 4, 5];
505 TestVecU32::put(&data);
506
507 assert_eq!(data, TestVecU32::stream_iter().collect::<Vec<_>>());
508
509 let data: Vec<Vec<u8>> = vec![vec![0; 3000], vec![1; 2500]];
510 TestVecVecU8::put(&data);
511
512 assert_eq!(data, TestVecVecU8::stream_iter().collect::<Vec<_>>());
513 })
514 }
515
516 #[test]
517 fn reading_big_intermediate_value() {
518 sp_io::TestExternalities::default().execute_with(|| {
519 let data: Vec<Vec<u8>> =
520 vec![vec![0; 20], vec![1; STORAGE_INPUT_BUFFER_CAPACITY * 2], vec![2; 30]];
521 TestVecVecU8::put(&data);
522
523 assert_eq!(data, TestVecVecU8::stream_iter().collect::<Vec<_>>());
524
525 let data: Vec<Vec<u8>> = vec![
526 vec![0; 20],
527 vec![1; STORAGE_INPUT_BUFFER_CAPACITY * 2],
528 vec![2; STORAGE_INPUT_BUFFER_CAPACITY * 2],
529 vec![3; 30],
530 vec![4; 30],
531 vec![5; STORAGE_INPUT_BUFFER_CAPACITY * 2],
532 vec![6; 30],
533 ];
534 TestVecVecU8::put(&data);
535
536 assert_eq!(data, TestVecVecU8::stream_iter().collect::<Vec<_>>());
537 })
538 }
539
540 #[test]
541 fn reading_more_data_as_in_the_state_is_detected() {
542 sp_io::TestExternalities::default().execute_with(|| {
543 let data: Vec<Vec<u8>> = vec![vec![0; 20], vec![1; STORAGE_INPUT_BUFFER_CAPACITY * 2]];
544 TestVecVecU8::put(&data);
545
546 let mut input = StorageInput::new(TestVecVecU8::hashed_key().into());
547
548 Compact::<u32>::decode(&mut input).unwrap();
549
550 Vec::<u8>::decode(&mut input).unwrap();
551
552 let mut buffer = vec![0; STORAGE_INPUT_BUFFER_CAPACITY * 4];
553 assert!(input
554 .read(&mut buffer)
555 .unwrap_err()
556 .to_string()
557 .contains("Not enough data to fill the buffer"));
558 })
559 }
560
561 #[test]
562 fn reading_invalid_data_from_state() {
563 sp_io::TestExternalities::default().execute_with(|| {
564 let data: Vec<u32> = vec![1, 2, 3, 4, 5];
565
566 let mut data_encoded = data.encode();
567 data_encoded.truncate(data_encoded.len() - 2);
568 sp_io::storage::set(&TestVecU32::hashed_key(), &data_encoded);
569 assert_eq!(
570 data.iter().copied().take(data.len() - 1).collect::<Vec<_>>(),
571 TestVecU32::stream_iter().collect::<Vec<_>>()
572 );
573
574 let data_encoded = data.encode()[2..].to_vec();
575 sp_io::storage::set(&TestVecU32::hashed_key(), &data_encoded);
576 assert!(TestVecU32::stream_iter().collect::<Vec<_>>().is_empty());
577
578 let data: Vec<Vec<u8>> = vec![vec![0; 20], vec![1; STORAGE_INPUT_BUFFER_CAPACITY * 2]];
579 let mut data_encoded = data.encode();
580 data_encoded.truncate(data_encoded.len() - 100);
581 sp_io::storage::set(&TestVecVecU8::hashed_key(), &data_encoded);
582
583 assert_eq!(
584 data.iter().cloned().take(1).collect::<Vec<_>>(),
585 TestVecVecU8::stream_iter().collect::<Vec<_>>()
586 );
587 })
588 }
589
590 #[test]
591 fn reading_with_fill_buffer() {
592 sp_io::TestExternalities::default().execute_with(|| {
593 const BUFFER_SIZE: usize = 300;
594 assert!(STORAGE_INPUT_BUFFER_CAPACITY % BUFFER_SIZE != 0, "Please update buffer size");
596 let data: Vec<Vec<u8>> = (0..=(STORAGE_INPUT_BUFFER_CAPACITY / BUFFER_SIZE))
599 .into_iter()
600 .map(|i| vec![i as u8; BUFFER_SIZE])
601 .collect::<Vec<Vec<u8>>>();
602 TestVecVecU8::put(&data);
603
604 assert_eq!(data, TestVecVecU8::stream_iter().collect::<Vec<_>>());
605
606 let mut input = StorageInput::new(TestVecVecU8::hashed_key().into());
607
608 Compact::<u32>::decode(&mut input).unwrap();
609
610 (0..data.len() - 1).into_iter().for_each(|_| {
611 Vec::<u8>::decode(&mut input).unwrap();
612 });
613
614 let mut result_buffer = vec![0; BUFFER_SIZE * 2];
616 assert!(input
617 .read(&mut result_buffer)
618 .unwrap_err()
619 .to_string()
620 .contains("Not enough data to fill the buffer"));
621 })
622 }
623
624 #[test]
625 fn detect_value_deleted_in_state() {
626 sp_io::TestExternalities::default().execute_with(|| {
627 let data: Vec<Vec<u8>> = vec![vec![0; 20], vec![1; STORAGE_INPUT_BUFFER_CAPACITY * 2]];
628 TestVecVecU8::put(&data);
629
630 let mut input = StorageInput::new(TestVecVecU8::hashed_key().into());
631 TestVecVecU8::kill();
632
633 Compact::<u32>::decode(&mut input).unwrap();
634 Vec::<u8>::decode(&mut input).unwrap();
635
636 assert!(Vec::<u8>::decode(&mut input)
637 .unwrap_err()
638 .to_string()
639 .contains("Value doesn't exist in the state?"));
640
641 const BUFFER_SIZE: usize = 300;
642 assert!(STORAGE_INPUT_BUFFER_CAPACITY % BUFFER_SIZE != 0, "Please update buffer size");
644 let data: Vec<Vec<u8>> = (0..=(STORAGE_INPUT_BUFFER_CAPACITY / BUFFER_SIZE))
647 .into_iter()
648 .map(|i| vec![i as u8; BUFFER_SIZE])
649 .collect::<Vec<Vec<u8>>>();
650 TestVecVecU8::put(&data);
651
652 let mut input = StorageInput::new(TestVecVecU8::hashed_key().into());
653 TestVecVecU8::kill();
654
655 Compact::<u32>::decode(&mut input).unwrap();
656 (0..data.len() - 1).into_iter().for_each(|_| {
657 Vec::<u8>::decode(&mut input).unwrap();
658 });
659
660 assert!(Vec::<u8>::decode(&mut input)
661 .unwrap_err()
662 .to_string()
663 .contains("Value doesn't exist in the state?"));
664 })
665 }
666}