Skip to content

Commit

Permalink
add WriteBufferManager support
Browse files Browse the repository at this point in the history
  • Loading branch information
benoitmeriaux committed Jan 3, 2024
1 parent 66f04df commit 2a91da0
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Bump MSRV to 1.63.0 (mina86)
* Convert properties to `&PropName` which can be converted at no cost to `&CStr`
and `&str` (mina86)
* Add WriteBufferManager support (benoitmeriaux)

## 0.21.0 (2023-05-09)

Expand Down
145 changes: 144 additions & 1 deletion src/db_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,106 @@ use crate::{
ColumnFamilyDescriptor, Error, SnapshotWithThreadMode,
};

pub(crate) struct WriteBufferManagerWrapper {
pub(crate) inner: NonNull<ffi::rocksdb_write_buffer_manager_t>,
}

impl Drop for WriteBufferManagerWrapper {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_write_buffer_manager_destroy(self.inner.as_ptr());
}
}
}

#[derive(Clone)]
pub struct WriteBufferManager(pub(crate) Arc<WriteBufferManagerWrapper>);

impl WriteBufferManager {
/// https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager
/// Write buffer manager helps users control the total memory used by memtables across multiple column families and/or DB instances.
/// Users can enable this control by 2 ways:
///
/// 1- Limit the total memtable usage across multiple column families and DBs under a threshold.
/// 2- Cost the memtable memory usage to block cache so that memory of RocksDB can be capped by the single limit.
/// The usage of a write buffer manager is similar to rate_limiter and sst_file_manager.
/// Users can create one write buffer manager object and pass it to all the options of column families or DBs whose memtable size they want to be controlled by this object.
///
/// A memory limit is given when creating the write buffer manager object. RocksDB will try to limit the total memory to under this limit.
///
/// a flush will be triggered on one column family of the DB you are inserting to,
///
/// If mutable memtable size exceeds about 90% of the limit,
/// If the total memory is over the limit, more aggressive flush may also be triggered only if the mutable memtable size also exceeds 50% of the limit.
/// Both checks are needed because if already more than half memory is being flushed, triggering more flush may not help.
///
/// The total memory is counted as total memory allocated in the arena, even if some of that may not yet be used by memtable.
///
/// buffer_size: the memory limit in bytes.
/// allow_stall: If set true, it will enable stalling of all writers when memory usage exceeds buffer_size (soft limit).
/// It will wait for flush to complete and memory usage to drop down
pub fn new_write_buffer_manager(buffer_size: size_t, allow_stall: bool) -> WriteBufferManager {
let inner = NonNull::new(unsafe {
ffi::rocksdb_write_buffer_manager_create(buffer_size, allow_stall)
})
.unwrap();
WriteBufferManager(Arc::new(WriteBufferManagerWrapper { inner }))
}

/// users can set up RocksDB to cost memory used by memtables to block cache.
/// This can happen no matter whether you enable memtable memory limit or not.
/// This option is added to manage memory (memtables + block cache) under a single limit.
///
/// buffer_size: the memory limit in bytes.
/// allow_stall: If set true, it will enable stalling of all writers when memory usage exceeds buffer_size (soft limit).
/// It will wait for flush to complete and memory usage to drop down
/// cache: the block cache instance
pub fn new_write_buffer_manager_with_cache(
buffer_size: size_t,
allow_stall: bool,
cache: Cache,
) -> WriteBufferManager {
let inner = NonNull::new(unsafe {
ffi::rocksdb_write_buffer_manager_create_with_cache(
buffer_size,
cache.0.inner.as_ptr(),
allow_stall,
)
})
.unwrap();
WriteBufferManager(Arc::new(WriteBufferManagerWrapper { inner }))
}

/// Returns the WriteBufferManager memory usage in bytes.
pub fn get_usage(&self) -> usize {
unsafe { ffi::rocksdb_write_buffer_manager_memory_usage(self.0.inner.as_ptr()) }
}

/// Returns the current buffer size in bytes.
pub fn get_buffer_size(&self) -> usize {
unsafe { ffi::rocksdb_write_buffer_manager_buffer_size(self.0.inner.as_ptr()) }
}

/// Set the buffer size in bytes.
pub fn set_buffer_size(&self, new_size: usize) {
unsafe {
ffi::rocksdb_write_buffer_manager_set_buffer_size(self.0.inner.as_ptr(), new_size)
}
}

/// Returns if WriteBufferManager is enabled.
pub fn enabled(&self) -> bool {
unsafe { ffi::rocksdb_write_buffer_manager_enabled(self.0.inner.as_ptr()) }
}

/// set the allow_stall flag.
pub fn set_allow_stall(&self, allow_stall: bool) {
unsafe {
ffi::rocksdb_write_buffer_manager_set_allow_stall(self.0.inner.as_ptr(), allow_stall)
}
}
}

pub(crate) struct CacheWrapper {
pub(crate) inner: NonNull<ffi::rocksdb_cache_t>,
}
Expand Down Expand Up @@ -109,6 +209,7 @@ pub(crate) struct OptionsMustOutliveDB {
env: Option<Env>,
row_cache: Option<Cache>,
block_based: Option<BlockBasedOptionsMustOutliveDB>,
write_buffer_manager: Option<WriteBufferManager>,
}

impl OptionsMustOutliveDB {
Expand All @@ -120,6 +221,10 @@ impl OptionsMustOutliveDB {
.block_based
.as_ref()
.map(BlockBasedOptionsMustOutliveDB::clone),
write_buffer_manager: self
.write_buffer_manager
.as_ref()
.map(WriteBufferManager::clone),
}
}
}
Expand Down Expand Up @@ -283,6 +388,7 @@ unsafe impl Send for ReadOptions {}
unsafe impl Send for IngestExternalFileOptions {}
unsafe impl Send for CacheWrapper {}
unsafe impl Send for CompactOptions {}
unsafe impl Send for WriteBufferManagerWrapper {}

// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
Expand All @@ -294,6 +400,7 @@ unsafe impl Sync for ReadOptions {}
unsafe impl Sync for IngestExternalFileOptions {}
unsafe impl Sync for CacheWrapper {}
unsafe impl Sync for CompactOptions {}
unsafe impl Sync for WriteBufferManagerWrapper {}

impl Drop for Options {
fn drop(&mut self) {
Expand Down Expand Up @@ -3229,6 +3336,24 @@ impl Options {
);
}
}

/// https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager
/// Write buffer manager helps users control the total memory used by memtables across multiple column families and/or DB instances.
/// Users can enable this control by 2 ways:
///
/// 1- Limit the total memtable usage across multiple column families and DBs under a threshold.
/// 2- Cost the memtable memory usage to block cache so that memory of RocksDB can be capped by the single limit.
/// The usage of a write buffer manager is similar to rate_limiter and sst_file_manager.
/// Users can create one write buffer manager object and pass it to all the options of column families or DBs whose memtable size they want to be controlled by this object.
pub fn set_write_buffer_manager(&mut self, write_buffer_manager: &WriteBufferManager) {
unsafe {
ffi::rocksdb_options_set_write_buffer_manager(
self.inner,
write_buffer_manager.0.inner.as_ptr(),
);
}
self.outlive.write_buffer_manager = Some(write_buffer_manager.clone());
}
}

impl Default for Options {
Expand Down Expand Up @@ -4159,7 +4284,8 @@ impl Drop for DBPath {

#[cfg(test)]
mod tests {
use crate::{MemtableFactory, Options};
use crate::db_options::WriteBufferManager;
use crate::{Cache, MemtableFactory, Options};

#[test]
fn test_enable_statistics() {
Expand Down Expand Up @@ -4194,4 +4320,21 @@ mod tests {
let opts = Options::default();
assert!(opts.get_statistics().is_none());
}

#[test]
fn test_set_write_buffer_manager() {
let mut opts = Options::default();
let lrucache = Cache::new_lru_cache(100);
let write_buffer_manager =
WriteBufferManager::new_write_buffer_manager_with_cache(100, false, lrucache);
assert_eq!(write_buffer_manager.get_buffer_size(), 100);
assert_eq!(write_buffer_manager.get_usage(), 0);
assert!(write_buffer_manager.enabled());

opts.set_write_buffer_manager(&write_buffer_manager);
drop(opts);

// WriteBufferManager outlives options
assert!(write_buffer_manager.enabled());
}
}
10 changes: 7 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub use crate::{
DBRecoveryMode, DataBlockIndexType, FifoCompactOptions, FlushOptions,
IngestExternalFileOptions, KeyEncodingType, LogLevel, MemtableFactory, Options,
PlainTableFactoryOptions, ReadOptions, ReadTier, UniversalCompactOptions,
UniversalCompactionStopStyle, WaitForCompactOptions, WriteOptions,
UniversalCompactionStopStyle, WaitForCompactOptions, WriteBufferManager, WriteOptions,
},
db_pinnable_slice::DBPinnableSlice,
env::Env,
Expand Down Expand Up @@ -233,11 +233,11 @@ mod test {

use super::{
column_family::UnboundColumnFamily,
db_options::CacheWrapper,
db_options::{CacheWrapper, WriteBufferManagerWrapper},
env::{Env, EnvWrapper},
BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamily, ColumnFamilyDescriptor,
DBIterator, DBRawIterator, IngestExternalFileOptions, Options, PlainTableFactoryOptions,
ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteOptions, DB,
ReadOptions, Snapshot, SstFileWriter, WriteBatch, WriteBufferManager, WriteOptions, DB,
};

#[test]
Expand Down Expand Up @@ -275,6 +275,8 @@ mod test {
is_send::<TransactionDBOptions>();
is_send::<OptimisticTransactionOptions>();
is_send::<TransactionOptions>();
is_send::<WriteBufferManager>();
is_send::<WriteBufferManagerWrapper>();
}

#[test]
Expand Down Expand Up @@ -305,5 +307,7 @@ mod test {
is_sync::<TransactionDBOptions>();
is_sync::<OptimisticTransactionOptions>();
is_sync::<TransactionOptions>();
is_sync::<WriteBufferManager>();
is_sync::<WriteBufferManagerWrapper>();
}
}

0 comments on commit 2a91da0

Please sign in to comment.