Skip to content

Commit

Permalink
Expose flush_cfs_opt to flush multiple column families (#793)
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhanhui committed Jun 23, 2023
1 parent e0dab6a commit 023f829
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
22 changes: 22 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,28 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
Ok(())
}

/// Flushes multiple column families.
///
/// If atomic flush is not enabled, it is equivalent to calling flush_cf multiple times.
/// If atomic flush is enabled, it will flush all column families specified in `cfs` up to the latest sequence
/// number at the time when flush is requested.
pub fn flush_cfs_opt(
&self,
cfs: &[&impl AsColumnFamilyRef],
opts: &FlushOptions,
) -> Result<(), Error> {
let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
unsafe {
ffi_try!(ffi::rocksdb_flush_cfs(
self.inner.inner(),
opts.inner,
cfs.as_mut_ptr(),
cfs.len() as libc::c_int,
));
}
Ok(())
}

/// Flushes database memtables to SST files on the disk for a given column family using default
/// options.
pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
Expand Down
45 changes: 45 additions & 0 deletions tests/test_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1397,3 +1397,48 @@ fn cuckoo() {
assert!(db.get(b"k1").unwrap().is_none());
}
}

#[test]
fn test_atomic_flush_cfs() {
let n = DBPath::new("_rust_rocksdb_atomic_flush_cfs");
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_atomic_flush(true);

let db = DB::open_cf(&opts, &n, ["cf1", "cf2"]).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
let cf2 = db.cf_handle("cf2").unwrap();

let mut write_options = rocksdb::WriteOptions::new();
write_options.disable_wal(true);

db.put_cf_opt(&cf1, "k11", "v11", &write_options).unwrap();
db.put_cf_opt(&cf2, "k21", "v21", &write_options).unwrap();

let mut opts = rocksdb::FlushOptions::new();
opts.set_wait(true);
db.flush_cfs_opt(&[&cf1, &cf2], &opts).unwrap();
}

{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_atomic_flush(true);

let db = DB::open_cf(&opts, &n, ["cf1", "cf2"]).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
let cf2 = db.cf_handle("cf2").unwrap();

assert_eq!(
db.get_cf(&cf1, "k11").unwrap(),
Some("v11".as_bytes().to_vec())
);
assert_eq!(
db.get_cf(&cf2, "k21").unwrap(),
Some("v21".as_bytes().to_vec())
);
}
}

0 comments on commit 023f829

Please sign in to comment.