diff --git a/src/main/protobuf/fossildbapi.proto b/src/main/protobuf/fossildbapi.proto index a688b4f..5e41354 100644 --- a/src/main/protobuf/fossildbapi.proto +++ b/src/main/protobuf/fossildbapi.proto @@ -130,6 +130,13 @@ message RestoreFromBackupReply { optional string errorMessage = 2; } +message CompactAllDataRequest {} + +message CompactAllDataReply { + required bool success = 1; + optional string errorMessage = 2; +} + service FossilDB { rpc Health (HealthRequest) returns (HealthReply) {} @@ -143,4 +150,5 @@ service FossilDB { rpc ListVersions (ListVersionsRequest) returns (ListVersionsReply) {} rpc Backup (BackupRequest) returns (BackupReply) {} rpc RestoreFromBackup (RestoreFromBackupRequest) returns (RestoreFromBackupReply) {} + rpc CompactAllData (CompactAllDataRequest) returns (CompactAllDataReply) {} } \ No newline at end of file diff --git a/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala b/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala index fe46156..59b4458 100644 --- a/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala +++ b/src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala @@ -90,6 +90,10 @@ class FossilDBGrpcImpl(storeManager: StoreManager) RestoreFromBackupReply(true) } {errorMsg => RestoreFromBackupReply(false, errorMsg)} + override def compactAllData(req: CompactAllDataRequest) = withExceptionHandler(req) { + storeManager.compactAllData + CompactAllDataReply(true) + } {errorMsg => CompactAllDataReply(false, errorMsg)} private def withExceptionHandler [T, R <: GeneratedMessage](request: R)(tryBlock: => T)(onErrorBlock: Option[String] => T): Future[T] = { try { diff --git a/src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala b/src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala index 35087db..d3ae962 100644 --- a/src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala +++ b/src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala @@ -21,17 +21,9 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat val (db: RocksDB, columnFamilyHandles) = { RocksDB.loadLibrary() - val columnOptions = new ColumnFamilyOptions() - .setArenaBlockSize(4 * 1024 * 1024) // 4MB - .setTargetFileSizeBase(1024 * 1024 * 1024) // 1GB - .setMaxBytesForLevelBase(10 * 1024 * 1024 * 1024) // 10GB - val columnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).map { columnFamily => - new ColumnFamilyDescriptor(columnFamily, columnOptions) - } - val columnFamilyHandles = new util.ArrayList[ColumnFamilyHandle] - var options = new DBOptions() - var cfListRef: mutable.Buffer[ColumnFamilyDescriptor] = mutable.Buffer() - optionsFilePathOpt.map { optionsFilePath => + val options = new DBOptions() + val cfListRef: mutable.Buffer[ColumnFamilyDescriptor] = mutable.Buffer() + optionsFilePathOpt.foreach { optionsFilePath => try { org.rocksdb.OptionsUtil.loadOptionsFromFile(optionsFilePath, Env.getDefault, options, cfListRef.asJava) logger.info("successfully loaded rocksdb options from " + optionsFilePath) @@ -41,10 +33,12 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat } } } - options = options - .setCreateIfMissing(true) - .setCreateMissingColumnFamilies(true) + options.setCreateIfMissing(true).setCreateMissingColumnFamilies(true) + val defaultColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(new ColumnFamilyOptions()) + val newColumnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).diff(cfListRef.toList.map(_.getName)).map(new ColumnFamilyDescriptor(_, defaultColumnFamilyOptions)) + val columnFamilyDescriptors = cfListRef.toList ::: newColumnFamilyDescriptors logger.info("Opening RocksDB at " + dataDir.toAbsolutePath) + val columnFamilyHandles = new util.ArrayList[ColumnFamilyHandle] val db = RocksDB.open( options, dataDir.toAbsolutePath.toString, @@ -61,7 +55,7 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat if (!Files.exists(backupDir) || !Files.isDirectory(backupDir)) Files.createDirectories(backupDir) - RocksDB.loadLibrary + RocksDB.loadLibrary() val backupEngine = BackupEngine.open(Env.getDefault, new BackupableDBOptions(backupDir.toString)) backupEngine.createNewBackup(db) backupEngine.purgeOldBackups(1) @@ -71,12 +65,19 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat def restoreFromBackup(backupDir: Path) = { logger.info("Restoring from backup. RocksDB temporarily unavailable") close() - RocksDB.loadLibrary + RocksDB.loadLibrary() val backupEngine = BackupEngine.open(Env.getDefault, new BackupableDBOptions(backupDir.toString)) backupEngine.restoreDbFromLatestBackup(dataDir.toString, dataDir.toString, new RestoreOptions(true)) logger.info("Restoring from backup complete. Reopening RocksDB") } + def compactAllData() = { + logger.info("Compacting all data") + RocksDB.loadLibrary() + db.compactRange() + logger.info("All data has been compacted to last level containing data") + } + def close(): Future[Unit] = { logger.info("Closing RocksDB handle") Future.successful(db.close()) @@ -100,7 +101,7 @@ class RocksDBIterator(it: RocksIterator, prefix: Option[String]) extends Iterato override def hasNext: Boolean = it.isValid && prefix.forall(it.key().startsWith(_)) override def next: KeyValuePair[Array[Byte]] = { - val value = KeyValuePair(new String(it.key().map(_.toChar)) , it.value()) + val value = KeyValuePair(new String(it.key().map(_.toChar)), it.value()) it.next() value } diff --git a/src/main/scala/com/scalableminds/fossildb/db/StoreManager.scala b/src/main/scala/com/scalableminds/fossildb/db/StoreManager.scala index 3e08c08..b09987e 100644 --- a/src/main/scala/com/scalableminds/fossildb/db/StoreManager.scala +++ b/src/main/scala/com/scalableminds/fossildb/db/StoreManager.scala @@ -66,6 +66,14 @@ class StoreManager(dataDir: Path, backupDir: Path, columnFamilies: List[String], } } + def compactAllData = { + failDuringBackup + failDuringRestore + try { + rocksDBManager.get.compactAllData() + } + } + def close = { rocksDBManager.map(_.close) }