Skip to content

Commit

Permalink
feat: Expose wait_for_compact (rust-rocksdb#841)
Browse files Browse the repository at this point in the history
  • Loading branch information
zaidoon1 authored and zaidoon committed Feb 19, 2024
1 parent 472f583 commit 3a2bab8
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 3 deletions.
20 changes: 19 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
WaitForCompactOptions, WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
};

use crate::ffi_util::CSlice;
Expand Down Expand Up @@ -1746,6 +1746,24 @@ impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
}
}

/// Wait for all flush and compactions jobs to finish. Jobs to wait include the
/// unscheduled (queued, but not scheduled yet).
///
/// NOTE: This may also never return if there's sufficient ongoing writes that
/// keeps flush and compaction going without stopping. The user would have to
/// cease all the writes to DB to make this eventually return in a stable
/// state. The user may also use timeout option in WaitForCompactOptions to
/// make this stop waiting and return when timeout expires.
pub fn wait_for_compact(&self, opts: &WaitForCompactOptions) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_wait_for_compact(
self.inner.inner(),
opts.inner
));
}
Ok(())
}

pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
let copts = convert_options(opts)?;
let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
Expand Down
59 changes: 59 additions & 0 deletions src/db_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3981,6 +3981,65 @@ impl CompactOptions {
}
}

pub struct WaitForCompactOptions {
pub(crate) inner: *mut ffi::rocksdb_wait_for_compact_options_t,
}

impl Default for WaitForCompactOptions {
fn default() -> Self {
let opts = unsafe { ffi::rocksdb_wait_for_compact_options_create() };
assert!(
!opts.is_null(),
"Could not create RocksDB Wait For Compact Options"
);

Self { inner: opts }
}
}

impl Drop for WaitForCompactOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_wait_for_compact_options_destroy(self.inner);
}
}
}

impl WaitForCompactOptions {
/// If true, abort waiting if background jobs are paused. If false,
/// ContinueBackgroundWork() must be called to resume the background jobs.
/// Otherwise, jobs that were queued, but not scheduled yet may never finish
/// and WaitForCompact() may wait indefinitely (if timeout is set, it will
/// abort after the timeout).
///
/// Default: false
pub fn set_abort_on_pause(&mut self, v: bool) {
unsafe {
ffi::rocksdb_wait_for_compact_options_set_abort_on_pause(self.inner, c_uchar::from(v));
}
}

/// If true, flush all column families before starting to wait.
///
/// Default: false
pub fn set_flush(&mut self, v: bool) {
unsafe {
ffi::rocksdb_wait_for_compact_options_set_flush(self.inner, c_uchar::from(v));
}
}

/// Timeout in microseconds for waiting for compaction to complete.
/// when timeout == 0, WaitForCompact() will wait as long as there's background
/// work to finish.
///
/// Default: 0
pub fn set_timeout(&mut self, microseconds: u64) {
unsafe {
ffi::rocksdb_wait_for_compact_options_set_timeout(self.inner, microseconds);
}
}
}

/// Represents a path where sst files can be put into
pub struct DBPath {
pub(crate) inner: *mut ffi::rocksdb_dbpath_t,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub use crate::{
DBRecoveryMode, DataBlockIndexType, FifoCompactOptions, FlushOptions,
IngestExternalFileOptions, KeyEncodingType, LogLevel, MemtableFactory, Options,
PlainTableFactoryOptions, ReadOptions, ReadTier, UniversalCompactOptions,
UniversalCompactionStopStyle, WriteOptions,
UniversalCompactionStopStyle, WaitForCompactOptions, WriteOptions,
},
db_pinnable_slice::DBPinnableSlice,
env::Env,
Expand Down
34 changes: 33 additions & 1 deletion tests/test_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use speedb::{
ColumnFamilyDescriptor, CompactOptions, CuckooTableOptions, DBAccess, DBCompactionStyle,
DBWithThreadMode, Env, Error, ErrorKind, FifoCompactOptions, IteratorMode, MultiThreaded,
Options, PerfContext, PerfMetric, ReadOptions, SingleThreaded, SliceTransform, Snapshot,
UniversalCompactOptions, UniversalCompactionStopStyle, WriteBatch, DB,
UniversalCompactOptions, UniversalCompactionStopStyle, WaitForCompactOptions, WriteBatch, DB,
DEFAULT_COLUMN_FAMILY_NAME,
};
use util::{assert_iter, pair, DBPath};
Expand Down Expand Up @@ -826,6 +826,38 @@ fn fifo_compaction_test() {
}
}

#[test]
fn wait_for_compact_test() {
let path = DBPath::new("_rust_rocksdb_wait_for_compact_test");
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);

// set wait for compact options
let mut wait_for_compact_opts: WaitForCompactOptions = WaitForCompactOptions::default();
wait_for_compact_opts.set_abort_on_pause(false);
wait_for_compact_opts.set_flush(true);

let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, &path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(&cf1, b"k1", b"v1").unwrap();
db.put_cf(&cf1, b"k2", b"v2").unwrap();
db.put_cf(&cf1, b"k3", b"v3").unwrap();
db.put_cf(&cf1, b"k4", b"v4").unwrap();
db.put_cf(&cf1, b"k5", b"v5").unwrap();

db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
db.put(b"k5", b"v5").unwrap();

db.wait_for_compact(&wait_for_compact_opts).unwrap()
}
}

#[test]
fn env_and_dbpaths_test() {
let path = DBPath::new("_rust_rocksdb_dbpath_test");
Expand Down

0 comments on commit 3a2bab8

Please sign in to comment.