-
Notifications
You must be signed in to change notification settings - Fork 242
/
Copy pathpersistent_state.rs
1172 lines (1029 loc) · 43.7 KB
/
persistent_state.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
use bincode;
use itertools::Itertools;
use rocksdb::{self, PlainTableFactoryOptions, SliceTransform, WriteBatch};
use serde;
use tempfile::{tempdir, TempDir};
use crate::prelude::*;
use crate::state::{RecordResult, State};
use common::SizeOf;
// Incremented on each PersistentState initialization so that IndexSeq
// can be used to create unique identifiers for rows.
type IndexEpoch = u64;
// Monotonically increasing sequence number since last IndexEpoch used to uniquely identify a row.
type IndexSeq = u64;
// RocksDB key used for storing meta information (like indices).
const META_KEY: &[u8] = b"meta";
// A default column family is always created, so we'll make use of that for meta information.
// The indices themselves are stored in a column family each, with their position in
// PersistentState::indices as name.
const DEFAULT_CF: &str = "default";
// Maximum rows per WriteBatch when building new indices for existing rows.
const INDEX_BATCH_SIZE: usize = 100_000;
// Store index information in RocksDB to avoid rebuilding indices on recovery.
#[derive(Default, Serialize, Deserialize)]
struct PersistentMeta {
indices: Vec<Vec<usize>>,
epoch: IndexEpoch,
}
#[derive(Clone)]
struct PersistentIndex {
column_family: String,
columns: Vec<usize>,
}
/// PersistentState stores data in RocksDB.
pub struct PersistentState {
db_opts: rocksdb::Options,
// We don't really want DB to be an option, but doing so lets us drop it manually in
// PersistenState's Drop by setting `self.db = None` - after which we can then discard the
// persisted files if we want to.
db: Option<rocksdb::DB>,
// The first element is always considered the primary index, where the actual data is stored.
// Subsequent indices maintain pointers to the data in the first index, and cause an additional
// read during lookups. When `self.has_unique_index` is true the first index is a primary key,
// and all its keys are considered unique.
indices: Vec<PersistentIndex>,
seq: IndexSeq,
epoch: IndexEpoch,
has_unique_index: bool,
// With DurabilityMode::DeleteOnExit,
// RocksDB files are stored in a temporary directory.
_directory: Option<TempDir>,
}
impl State for PersistentState {
fn process_records(&mut self, records: &mut Records, partial_tag: Option<Tag>) {
assert!(partial_tag.is_none(), "PersistentState can't be partial");
if records.len() == 0 {
return;
}
let mut batch = WriteBatch::default();
for r in records.iter() {
match *r {
Record::Positive(ref r) => {
self.insert(&mut batch, r);
}
Record::Negative(ref r) => {
self.remove(&mut batch, r);
}
}
}
// Sync the writes to RocksDB's WAL:
let mut opts = rocksdb::WriteOptions::default();
opts.set_sync(true);
tokio::task::block_in_place(|| self.db.as_ref().unwrap().write_opt(batch, &opts)).unwrap();
}
fn lookup(&self, columns: &[usize], key: &KeyType) -> LookupResult {
let db = self.db.as_ref().unwrap();
let index_id = self
.indices
.iter()
.position(|index| &index.columns[..] == columns)
.expect("lookup on non-indexed column set");
tokio::task::block_in_place(|| {
let cf = db.cf_handle(&self.indices[index_id].column_family).unwrap();
let prefix = Self::serialize_prefix(&key);
let data = if index_id == 0 && self.has_unique_index {
// This is a primary key, so we know there's only one row to retrieve
// (no need to use prefix_iterator).
let raw_row = db.get_cf(cf, &prefix).unwrap();
if let Some(raw) = raw_row {
let row = bincode::deserialize(&*raw).unwrap();
vec![row]
} else {
vec![]
}
} else {
// This could correspond to more than one value, so we'll use a prefix_iterator:
db.prefix_iterator_cf(cf, &prefix)
.map(|(_key, value)| bincode::deserialize(&*value).unwrap())
.collect()
};
LookupResult::Some(RecordResult::Owned(data))
})
}
fn add_key(&mut self, columns: &[usize], partial: Option<Vec<Tag>>) {
assert!(partial.is_none(), "Bases can't be partial");
let existing = self
.indices
.iter()
.any(|index| &index.columns[..] == columns);
if existing {
return;
}
let cols = Vec::from(columns);
// We'll store all the pointers (or values if this is index 0) for
// this index in its own column family:
let index_id = self.indices.len().to_string();
tokio::task::block_in_place(|| {
let db = self.db.as_mut().unwrap();
db.create_cf(&index_id, &self.db_opts).unwrap();
// Build the new index for existing values:
if !self.indices.is_empty() {
let first_cf = db.cf_handle(&self.indices[0].column_family).unwrap();
let iter = db.full_iterator_cf(first_cf, rocksdb::IteratorMode::Start);
for chunk in iter.chunks(INDEX_BATCH_SIZE).into_iter() {
let mut batch = WriteBatch::default();
for (ref pk, ref value) in chunk {
let row: Vec<DataType> = bincode::deserialize(&value).unwrap();
let index_key = Self::build_key(&row, columns);
let key = Self::serialize_secondary(&index_key, pk);
let cf = db.cf_handle(&index_id).unwrap();
batch.put_cf(cf, &key, value);
}
db.write(batch).unwrap();
}
}
self.indices.push(PersistentIndex {
columns: cols,
column_family: index_id.to_string(),
});
self.persist_meta();
});
}
fn keys(&self) -> Vec<Vec<usize>> {
self.indices
.iter()
.map(|index| index.columns.clone())
.collect()
}
fn cloned_records(&self) -> Vec<Vec<DataType>> {
self.all_rows()
.map(|(_, ref value)| bincode::deserialize(&value).unwrap())
.collect()
}
// Returns a row count estimate from RocksDB.
fn rows(&self) -> usize {
tokio::task::block_in_place(|| {
let db = self.db.as_ref().unwrap();
let cf = db.cf_handle("0").unwrap();
let total_keys = db
.property_int_value_cf(cf, "rocksdb.estimate-num-keys")
.unwrap()
.unwrap() as usize;
total_keys / self.indices.len()
})
}
fn is_useful(&self) -> bool {
!self.indices.is_empty()
}
fn is_partial(&self) -> bool {
false
}
fn mark_filled(&mut self, _: Vec<DataType>, _: Tag) {
unreachable!("PersistentState can't be partial")
}
fn mark_hole(&mut self, _: &[DataType], _: Tag) {
unreachable!("PersistentState can't be partial")
}
fn evict_random_keys(&mut self, _: usize) -> (&[usize], Vec<Vec<DataType>>, u64) {
unreachable!("can't evict keys from PersistentState")
}
fn evict_keys(&mut self, _: Tag, _: &[Vec<DataType>]) -> Option<(&[usize], u64)> {
unreachable!("can't evict keys from PersistentState")
}
fn clear(&mut self) {
unreachable!("can't clear PersistentState")
}
}
impl PersistentState {
pub fn new(
name: String,
primary_key: Option<&[usize]>,
params: &PersistenceParameters,
) -> Self {
tokio::task::block_in_place(|| {
use rocksdb::{ColumnFamilyDescriptor, DB};
let (directory, full_name) = match params.mode {
DurabilityMode::Permanent => (None, format!("{}.db", name)),
_ => {
let dir = tempdir().unwrap();
let path = dir.path().join(name.clone());
let full_name = format!("{}.db", path.to_str().unwrap());
(Some(dir), full_name)
}
};
let opts = Self::build_options(&name, params);
// We use a column for each index, and one for meta information.
// When opening the DB the exact same column families needs to be used,
// so we'll have to retrieve the existing ones first:
let column_families = match DB::list_cf(&opts, &full_name) {
Ok(cfs) => cfs,
Err(_err) => vec![DEFAULT_CF.to_string()],
};
let make_cfs = || -> Vec<ColumnFamilyDescriptor> {
column_families
.iter()
.map(|cf| {
ColumnFamilyDescriptor::new(cf.clone(), Self::build_options(&name, ¶ms))
})
.collect()
};
let mut db = DB::open_cf_descriptors(&opts, &full_name, make_cfs());
for _ in 0..100 {
if db.is_ok() {
break;
}
::std::thread::sleep(::std::time::Duration::from_millis(50));
db = DB::open_cf_descriptors(&opts, &full_name, make_cfs());
}
let mut db = db.unwrap();
let meta = Self::retrieve_and_update_meta(&db);
let indices: Vec<PersistentIndex> = meta
.indices
.into_iter()
.enumerate()
.map(|(i, columns)| PersistentIndex {
column_family: i.to_string(),
columns,
})
.collect();
// If there are more column families than indices (-1 to account for the default column
// family) we probably crashed while trying to build the last index (in Self::add_key), so
// we'll throw away our progress and try re-building it again later:
if column_families.len() - 1 > indices.len() {
db.drop_cf(&indices.len().to_string()).unwrap();
}
let mut state = Self {
seq: 0,
indices,
has_unique_index: primary_key.is_some(),
epoch: meta.epoch,
db_opts: opts,
db: Some(db),
_directory: directory,
};
if primary_key.is_some() && state.indices.is_empty() {
// This is the first time we're initializing this PersistentState,
// so persist the primary key index right away.
state
.db
.as_mut()
.unwrap()
.create_cf("0", &state.db_opts)
.unwrap();
let persistent_index = PersistentIndex {
column_family: "0".to_string(),
columns: primary_key.unwrap().to_vec(),
};
state.indices.push(persistent_index);
state.persist_meta();
}
state
})
}
fn build_options(name: &str, params: &PersistenceParameters) -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let user_key_length = 0; // variable key length
let bloom_bits_per_key = 10;
let hash_table_ratio = 0.75;
let index_sparseness = 16;
opts.set_plain_table_factory(&PlainTableFactoryOptions {
user_key_length,
bloom_bits_per_key,
hash_table_ratio,
index_sparseness,
});
if let Some(ref path) = params.log_dir {
// Append the db name to the WAL path to ensure
// that we create a directory for each base shard:
opts.set_wal_dir(path.join(&name));
}
// Create prefixes using `prefix_transform` on all new inserted keys:
let transform = SliceTransform::create("key", prefix_transform, Some(in_domain));
opts.set_prefix_extractor(transform);
// Assigns the number of threads for compactions and flushes in RocksDB.
// Optimally we'd like to use env->SetBackgroundThreads(n, Env::HIGH)
// and env->SetBackgroundThreads(n, Env::LOW) here, but that would force us to create our
// own env instead of relying on the default one that's shared across RocksDB instances
// (which isn't supported by rust-rocksdb yet either).
//
// Using opts.increase_parallelism here would only change the thread count in
// the low priority pool, so we'll rather use the deprecated max_background_compactions
// and max_background_flushes for now.
if params.persistence_threads > 1 {
// Split the threads between compactions and flushes,
// but round up for compactions and down for flushes:
opts.set_max_background_compactions((params.persistence_threads + 1) / 2);
opts.set_max_background_flushes(params.persistence_threads / 2);
}
// Increase a few default limits:
opts.set_max_bytes_for_level_base(2048 * 1024 * 1024);
opts.set_target_file_size_base(256 * 1024 * 1024);
// Keep up to 4 parallel memtables:
opts.set_max_write_buffer_number(4);
// Use a hash linked list since we're doing prefix seeks.
opts.set_allow_concurrent_memtable_write(false);
opts.set_memtable_factory(rocksdb::MemtableFactory::HashLinkList {
bucket_count: 1_000_000,
});
opts
}
fn build_key<'a>(row: &'a [DataType], columns: &[usize]) -> KeyType<'a> {
KeyType::from(columns.iter().map(|i| &row[*i]))
}
fn retrieve_and_update_meta(db: &rocksdb::DB) -> PersistentMeta {
let indices = db.get(META_KEY).unwrap();
let mut meta = match indices {
Some(data) => bincode::deserialize(&*data).unwrap(),
None => PersistentMeta::default(),
};
meta.epoch += 1;
let data = bincode::serialize(&meta).unwrap();
db.put(META_KEY, &data).unwrap();
meta
}
fn persist_meta(&mut self) {
let db = self.db.as_ref().unwrap();
// Stores the columns of self.indices in RocksDB so that we don't rebuild indices on recovery.
let columns = self.indices.iter().map(|i| i.columns.clone()).collect();
let meta = PersistentMeta {
indices: columns,
epoch: self.epoch,
};
let data = bincode::serialize(&meta).unwrap();
db.put(META_KEY, &data).unwrap();
}
// Our RocksDB keys come in three forms, and are encoded as follows:
//
// * Unique Primary Keys
// (size, key), where size is the serialized byte size of `key`
// (used in `prefix_transform`).
//
// * Non-unique Primary Keys
// (size, key, epoch, seq), where epoch is incremented on each recover, and seq is a
// monotonically increasing sequence number that starts at 0 for every new epoch.
//
// * Secondary Index Keys
// (size, key, primary_key), where `primary_key` makes sure that each secondary index row is
// unique.
//
// Self::serialize_raw_key is responsible for serializing the underlying KeyType tuple directly
// (without the enum variant), plus any extra information as described above.
fn serialize_raw_key<S: serde::Serialize>(key: &KeyType, extra: S) -> Vec<u8> {
fn serialize<K: serde::Serialize, E: serde::Serialize>(k: K, extra: E) -> Vec<u8> {
let size: u64 = bincode::serialized_size(&k).unwrap();
bincode::serialize(&(size, k, extra)).unwrap()
}
match key {
KeyType::Single(k) => serialize(k, extra),
KeyType::Double(k) => serialize(k, extra),
KeyType::Tri(k) => serialize(k, extra),
KeyType::Quad(k) => serialize(k, extra),
KeyType::Quin(k) => serialize(k, extra),
KeyType::Sex(k) => serialize(k, extra),
}
}
fn serialize_prefix(key: &KeyType) -> Vec<u8> {
Self::serialize_raw_key(key, ())
}
fn serialize_secondary(key: &KeyType, raw_primary: &[u8]) -> Vec<u8> {
let mut bytes = Self::serialize_raw_key(key, ());
bytes.extend_from_slice(raw_primary);
bytes
}
// Filters out secondary indices to return an iterator for the actual key-value pairs.
fn all_rows(&self) -> impl Iterator<Item = (Box<[u8]>, Box<[u8]>)> + '_ {
let db = self.db.as_ref().unwrap();
let cf = db.cf_handle(&self.indices[0].column_family).unwrap();
db.full_iterator_cf(cf, rocksdb::IteratorMode::Start)
}
// Puts by primary key first, then retrieves the existing value for each index and appends the
// newly created primary key value.
// TODO(ekmartin): This will put exactly the values that are given, and can only be retrieved
// with exactly those values. I think the regular state implementation supports inserting
// something like an Int and retrieving with a BigInt.
fn insert(&mut self, batch: &mut WriteBatch, r: &[DataType]) {
let serialized_pk = {
let pk = Self::build_key(r, &self.indices[0].columns);
if self.has_unique_index {
Self::serialize_prefix(&pk)
} else {
// For bases without primary keys we store the actual row values keyed by the index
// that was added first. This means that we can't consider the keys unique though, so
// we'll append a sequence number.
self.seq += 1;
Self::serialize_raw_key(&pk, (self.epoch, self.seq))
}
};
// First insert the actual value for our primary index:
let serialized_row = bincode::serialize(&r).unwrap();
tokio::task::block_in_place(|| {
let db = self.db.as_ref().unwrap();
let value_cf = db.cf_handle(&self.indices[0].column_family).unwrap();
batch.put_cf(value_cf, &serialized_pk, &serialized_row);
// Then insert primary key pointers for all the secondary indices:
for index in self.indices[1..].iter() {
// Construct a key with the index values, and serialize it with bincode:
let key = Self::build_key(&r, &index.columns);
let serialized_key = Self::serialize_secondary(&key, &serialized_pk);
let cf = db.cf_handle(&index.column_family).unwrap();
batch.put_cf(cf, &serialized_key, &serialized_row);
}
})
}
fn remove(&self, batch: &mut WriteBatch, r: &[DataType]) {
tokio::task::block_in_place(|| {
let db = self.db.as_ref().unwrap();
let pk_index = &self.indices[0];
let value_cf = db.cf_handle(&pk_index.column_family).unwrap();
let mut do_remove = move |primary_key: &[u8]| {
// Delete the value row first (primary index):
batch.delete_cf(value_cf, &primary_key);
// Then delete any references that point _exactly_ to that row:
for index in self.indices[1..].iter() {
let key = Self::build_key(&r, &index.columns);
let serialized_key = Self::serialize_secondary(&key, primary_key);
let cf = db.cf_handle(&index.column_family).unwrap();
batch.delete_cf(cf, &serialized_key);
}
};
let pk = Self::build_key(&r, &pk_index.columns);
let prefix = Self::serialize_prefix(&pk);
if self.has_unique_index {
if cfg!(debug_assertions) {
// This would imply that we're trying to delete a different row than the one we
// found when we resolved the DeleteRequest in Base. This really shouldn't happen,
// but we'll leave a check here in debug mode for now.
let raw = db
.get_cf(value_cf, &prefix)
.unwrap()
.expect("tried removing non-existant primary key row");
let value: Vec<DataType> = bincode::deserialize(&*raw).unwrap();
assert_eq!(r, &value[..], "tried removing non-matching primary key row");
}
do_remove(&prefix[..]);
} else {
let (key, _value) = db
.prefix_iterator_cf(value_cf, &prefix)
.find(|(_, raw_value)| {
let value: Vec<DataType> = bincode::deserialize(&*raw_value).unwrap();
r == &value[..]
})
.expect("tried removing non-existant row");
do_remove(&key[..]);
};
})
}
}
// SliceTransforms are used to create prefixes of all inserted keys, which can then be used for
// both bloom filters and hash structure lookups.
//
// Selects a prefix of `key` without the epoch or sequence number.
//
// The RocksDB docs state the following:
// > If non-nullptr, use the specified function to determine the
// > prefixes for keys. These prefixes will be placed in the filter.
// > Depending on the workload, this can reduce the number of read-IOP
// > cost for scans when a prefix is passed via ReadOptions to
// > db.NewIterator(). For prefix filtering to work properly,
// > "prefix_extractor" and "comparator" must be such that the following
// > properties hold:
//
// > 1) key.starts_with(prefix(key))
// > 2) Compare(prefix(key), key) <= 0.
// > 3) If Compare(k1, k2) <= 0, then Compare(prefix(k1), prefix(k2)) <= 0
// > 4) prefix(prefix(key)) == prefix(key)
//
// NOTE(ekmartin): Encoding the key size in the key increases the total size with 8 bytes.
// If we really wanted to avoid this while still maintaining the same serialization scheme
// we could do so by figuring out how many bytes our bincode serialized KeyType takes
// up here in transform_fn. Example:
// Double((DataType::Int(1), DataType::BigInt(10))) would be serialized as:
// 1u32 (enum type), 0u32 (enum variant), 1i32 (value), 1u32 (enum variant), 1i64 (value)
// By stepping through the serialized bytes and checking each enum variant we would know
// when we reached the end, and could then with certainty say whether we'd already
// prefix transformed this key before or not
// (without including the byte size of Vec<DataType>).
fn prefix_transform<'a>(key: &'a [u8]) -> &'a [u8] {
// We'll have to make sure this isn't the META_KEY even when we're filtering it out
// in Self::in_domain_fn, as the SliceTransform is used to make hashed keys for our
// HashLinkedList memtable factory.
if key == META_KEY {
return key;
}
// We encoded the size of the key itself with a u64, which bincode uses 8 bytes to encode:
let size_offset = 8;
let key_size: u64 = bincode::deserialize(&key[..size_offset]).unwrap();
let prefix_len = size_offset + key_size as usize;
// Strip away the key suffix if we haven't already done so:
&key[..prefix_len]
}
// Decides which keys the prefix transform should apply to.
fn in_domain(key: &[u8]) -> bool {
key != META_KEY
}
impl SizeOf for PersistentState {
fn size_of(&self) -> u64 {
use std::mem::size_of;
size_of::<Self>() as u64
}
fn deep_size_of(&self) -> u64 {
let db = self.db.as_ref().unwrap();
db.property_int_value("rocksdb.estimate-live-data-size")
.unwrap()
.unwrap()
}
fn is_empty(&self) -> bool {
let db = self.db.as_ref().unwrap();
db.property_int_value("rocksdb.estimate-num-keys")
.unwrap()
.unwrap()
== 0
}
}
#[cfg(test)]
mod tests {
use super::*;
use bincode;
use std::path::PathBuf;
fn insert<S: State>(state: &mut S, row: Vec<DataType>) {
let record: Record = row.into();
state.process_records(&mut record.into(), None);
}
fn get_tmp_path() -> (TempDir, String) {
let dir = tempdir().unwrap();
let path = dir.path().join("soup");
(dir, path.to_string_lossy().into())
}
fn setup_persistent(prefix: &str) -> PersistentState {
PersistentState::new(
String::from(prefix),
None,
&PersistenceParameters::default(),
)
}
#[test]
fn persistent_state_is_partial() {
let state = setup_persistent("persistent_state_is_partial");
assert!(!state.is_partial());
}
#[test]
fn persistent_state_single_key() {
let mut state = setup_persistent("persistent_state_single_key");
let columns = &[0];
let row: Vec<DataType> = vec![10.into(), "Cat".into()];
state.add_key(columns, None);
insert(&mut state, row);
match state.lookup(columns, &KeyType::Single(&5.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => assert_eq!(rows.len(), 0),
_ => unreachable!(),
};
match state.lookup(columns, &KeyType::Single(&10.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows[0][0], 10.into());
assert_eq!(rows[0][1], "Cat".into());
}
_ => unreachable!(),
}
}
#[test]
fn persistent_state_multi_key() {
let mut state = setup_persistent("persistent_state_multi_key");
let columns = &[0, 2];
let row: Vec<DataType> = vec![10.into(), "Cat".into(), 20.into()];
state.add_key(columns, None);
insert(&mut state, row.clone());
match state.lookup(columns, &KeyType::Double((1.into(), 2.into()))) {
LookupResult::Some(RecordResult::Owned(rows)) => assert_eq!(rows.len(), 0),
_ => unreachable!(),
};
match state.lookup(columns, &KeyType::Double((10.into(), 20.into()))) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows[0], row);
}
_ => unreachable!(),
}
}
#[test]
fn persistent_state_multiple_indices() {
let mut state = setup_persistent("persistent_state_multiple_indices");
let first: Vec<DataType> = vec![10.into(), "Cat".into(), 1.into()];
let second: Vec<DataType> = vec![20.into(), "Cat".into(), 1.into()];
state.add_key(&[0], None);
state.add_key(&[1, 2], None);
state.process_records(&mut vec![first.clone(), second.clone()].into(), None);
match state.lookup(&[0], &KeyType::Single(&10.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(rows[0], first);
}
_ => unreachable!(),
}
match state.lookup(&[1, 2], &KeyType::Double(("Cat".into(), 1.into()))) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 2);
assert_eq!(&rows[0], &first);
assert_eq!(&rows[1], &second);
}
_ => unreachable!(),
}
}
#[test]
fn persistent_state_primary_key() {
let pk = &[0, 1];
let mut state = PersistentState::new(
String::from("persistent_state_primary_key"),
Some(pk),
&PersistenceParameters::default(),
);
let first: Vec<DataType> = vec![1.into(), 2.into(), "Cat".into()];
let second: Vec<DataType> = vec![10.into(), 20.into(), "Cat".into()];
state.add_key(pk, None);
state.add_key(&[2], None);
state.process_records(&mut vec![first.clone(), second.clone()].into(), None);
match state.lookup(pk, &KeyType::Double((1.into(), 2.into()))) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &first);
}
_ => unreachable!(),
}
match state.lookup(pk, &KeyType::Double((10.into(), 20.into()))) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &second);
}
_ => unreachable!(),
}
match state.lookup(pk, &KeyType::Double((1.into(), 20.into()))) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 0);
}
_ => unreachable!(),
}
match state.lookup(&[2], &KeyType::Single(&"Cat".into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 2);
assert_eq!(&rows[0], &first);
assert_eq!(&rows[1], &second);
}
_ => unreachable!(),
}
}
#[test]
fn persistent_state_primary_key_delete() {
let pk = &[0];
let mut state = PersistentState::new(
String::from("persistent_state_primary_key_delete"),
Some(pk),
&PersistenceParameters::default(),
);
let first: Vec<DataType> = vec![1.into(), 2.into()];
let second: Vec<DataType> = vec![10.into(), 20.into()];
state.add_key(pk, None);
state.process_records(&mut vec![first.clone(), second.clone()].into(), None);
match state.lookup(&[0], &KeyType::Single(&1.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &first);
}
_ => unreachable!(),
}
state.process_records(&mut vec![(first.clone(), false)].into(), None);
match state.lookup(&[0], &KeyType::Single(&1.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 0);
}
_ => unreachable!(),
}
match state.lookup(&[0], &KeyType::Single(&10.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &second);
}
_ => unreachable!(),
}
}
#[test]
fn persistent_state_not_unique_primary() {
let mut state = setup_persistent("persistent_state_multiple_indices");
let first: Vec<DataType> = vec![0.into(), 0.into()];
let second: Vec<DataType> = vec![0.into(), 1.into()];
state.add_key(&[0], None);
state.add_key(&[1], None);
state.process_records(&mut vec![first.clone(), second.clone()].into(), None);
match state.lookup(&[0], &KeyType::Single(&0.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 2);
assert_eq!(&rows[0], &first);
assert_eq!(&rows[1], &second);
}
_ => unreachable!(),
}
match state.lookup(&[1], &KeyType::Single(&0.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &first);
}
_ => unreachable!(),
}
}
#[test]
fn persistent_state_different_indices() {
let mut state = setup_persistent("persistent_state_different_indices");
let first: Vec<DataType> = vec![10.into(), "Cat".into()];
let second: Vec<DataType> = vec![20.into(), "Bob".into()];
state.add_key(&[0], None);
state.add_key(&[1], None);
state.process_records(&mut vec![first.clone(), second.clone()].into(), None);
match state.lookup(&[0], &KeyType::Single(&10.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &first);
}
_ => unreachable!(),
}
match state.lookup(&[1], &KeyType::Single(&"Bob".into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &second);
}
_ => unreachable!(),
}
}
#[test]
fn persistent_state_recover() {
let (_dir, name) = get_tmp_path();
let mut params = PersistenceParameters::default();
params.mode = DurabilityMode::Permanent;
let first: Vec<DataType> = vec![10.into(), "Cat".into()];
let second: Vec<DataType> = vec![20.into(), "Bob".into()];
{
let mut state = PersistentState::new(name.clone(), None, ¶ms);
state.add_key(&[0], None);
state.add_key(&[1], None);
state.process_records(&mut vec![first.clone(), second.clone()].into(), None);
}
let state = PersistentState::new(name, None, ¶ms);
match state.lookup(&[0], &KeyType::Single(&10.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &first);
}
_ => unreachable!(),
}
match state.lookup(&[1], &KeyType::Single(&"Bob".into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &second);
}
_ => unreachable!(),
}
}
#[test]
fn persistent_state_recover_unique_key() {
let (_dir, name) = get_tmp_path();
let mut params = PersistenceParameters::default();
params.mode = DurabilityMode::Permanent;
let first: Vec<DataType> = vec![10.into(), "Cat".into()];
let second: Vec<DataType> = vec![20.into(), "Bob".into()];
{
let mut state = PersistentState::new(name.clone(), Some(&[0]), ¶ms);
state.add_key(&[0], None);
state.add_key(&[1], None);
state.process_records(&mut vec![first.clone(), second.clone()].into(), None);
}
let state = PersistentState::new(name, Some(&[0]), ¶ms);
match state.lookup(&[0], &KeyType::Single(&10.into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &first);
}
_ => unreachable!(),
}
match state.lookup(&[1], &KeyType::Single(&"Bob".into())) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &second);
}
_ => unreachable!(),
}
}
#[test]
fn persistent_state_remove() {
let mut state = setup_persistent("persistent_state_remove");
let first: Vec<DataType> = vec![10.into(), "Cat".into()];
let duplicate: Vec<DataType> = vec![10.into(), "Other Cat".into()];
let second: Vec<DataType> = vec![20.into(), "Cat".into()];
state.add_key(&[0], None);
state.add_key(&[1], None);
state.process_records(
&mut vec![first.clone(), duplicate.clone(), second.clone()].into(),
None,
);
state.process_records(
&mut vec![(first.clone(), false), (first.clone(), false)].into(),
None,
);
// We only want to remove rows that match exactly, not all rows that match the key:
match state.lookup(&[0], &KeyType::Single(&first[0])) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &duplicate);
}
_ => unreachable!(),
};
// Also shouldn't have removed other keys:
match state.lookup(&[0], &KeyType::Single(&second[0])) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &second);
}
_ => unreachable!(),
}
// Make sure we didn't remove secondary keys pointing to different rows:
match state.lookup(&[1], &KeyType::Single(&second[1])) {
LookupResult::Some(RecordResult::Owned(rows)) => {
assert_eq!(rows.len(), 1);
assert_eq!(&rows[0], &second);
}
_ => unreachable!(),
}
}
#[test]
fn persistent_state_is_useful() {
let mut state = setup_persistent("persistent_state_is_useful");
let columns = &[0];
assert!(!state.is_useful());
state.add_key(columns, None);
assert!(state.is_useful());
}
#[test]
fn persistent_state_rows() {
let mut state = setup_persistent("persistent_state_rows");
let mut rows = vec![];
for i in 0..30 {
let row = vec![DataType::from(i); 30];
rows.push(row);
state.add_key(&[i], None);
}
for row in rows.iter().cloned() {
insert(&mut state, row);
}
let count = state.rows();
// rows() is estimated, but we want to make sure we at least don't return
// self.indices.len() * rows.len() here.
assert!(count > 0 && count < rows.len() * 2);
}
#[test]
fn persistent_state_deep_size_of() {
let state = setup_persistent("persistent_state_deep_size_of");
let size = state.deep_size_of();
assert_eq!(size, 0);
}
#[test]
fn persistent_state_dangling_indices() {
let (_dir, name) = get_tmp_path();
let mut rows = vec![];
for i in 0..10 {