Skip to content

Commit

Permalink
Merge pull request #194 from Syndica/dnut/blockstore
Browse files Browse the repository at this point in the history
feat(blockstore): blockstore database
  • Loading branch information
dnut authored Aug 9, 2024
2 parents cccc96c + 10c1fd5 commit 988d6f6
Show file tree
Hide file tree
Showing 17 changed files with 2,246 additions and 41 deletions.
6 changes: 6 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub fn build(b: *Build) void {
const curl_dep = b.dependency("curl", dep_opts);
const curl_mod = curl_dep.module("curl");

const rocksdb_dep = b.dependency("rocksdb", dep_opts);
const rocksdb_mod = rocksdb_dep.module("rocksdb-bindings");

// expose Sig as a module
const sig_mod = b.addModule("sig", .{
.root_source_file = b.path("src/lib.zig"),
Expand All @@ -46,6 +49,7 @@ pub fn build(b: *Build) void {
sig_mod.addImport("httpz", httpz_mod);
sig_mod.addImport("zstd", zstd_mod);
sig_mod.addImport("curl", curl_mod);
sig_mod.addImport("rocksdb", rocksdb_mod);

// main executable
const sig_exe = b.addExecutable(.{
Expand All @@ -61,6 +65,7 @@ pub fn build(b: *Build) void {
sig_exe.root_module.addImport("zig-cli", zig_cli_module);
sig_exe.root_module.addImport("zig-network", zig_network_module);
sig_exe.root_module.addImport("zstd", zstd_mod);
sig_exe.root_module.addImport("rocksdb", rocksdb_mod);

const main_exe_run = b.addRunArtifact(sig_exe);
main_exe_run.addArgs(b.args orelse &.{});
Expand All @@ -79,6 +84,7 @@ pub fn build(b: *Build) void {
unit_tests_exe.root_module.addImport("httpz", httpz_mod);
unit_tests_exe.root_module.addImport("zig-network", zig_network_module);
unit_tests_exe.root_module.addImport("zstd", zstd_mod);
unit_tests_exe.root_module.addImport("rocksdb", rocksdb_mod);

const unit_tests_exe_run = b.addRunArtifact(unit_tests_exe);
test_step.dependOn(&unit_tests_exe_run.step);
Expand Down
4 changes: 4 additions & 0 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,9 @@
.url = "https://github.com/jiacai2050/zig-curl/archive/8a3f45798a80a5de4c11c6fa44dab8785c421d27.tar.gz",
.hash = "1220f70ac854b59315a8512861e039648d677feb4f9677bd873d6b9b7074a5906485",
},
.rocksdb = .{
.url = "https://github.com/Syndica/rocksdb-zig/archive/21ac0ae13c9ac38706e6b76117f32bab5158d71f.tar.gz",
.hash = "12207bec9b5a101d0ec476fb2bb8877b15afb6cb6edc5c98851be7e3e193d15e5759",
},
},
}
7 changes: 7 additions & 0 deletions src/blockstore/blockstore.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const sig = @import("../lib.zig");

pub const BlockstoreDB = sig.blockstore.rocksdb.RocksDB(&sig.blockstore.schema.list);

test BlockstoreDB {
sig.blockstore.database.assertIsDatabase(BlockstoreDB);
}
212 changes: 212 additions & 0 deletions src/blockstore/database.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
const std = @import("std");
const sig = @import("../lib.zig");

const Allocator = std.mem.Allocator;

const Logger = sig.trace.Logger;

pub fn assertIsDatabase(comptime Impl: type) void {
sig.utils.interface.assertSameInterface(Database(Impl), Impl, .subset);
sig.utils.interface.assertSameInterface(Database(Impl).WriteBatch, Impl.WriteBatch, .subset);
}

/// Runs all tests in `tests`
pub fn testDatabase(comptime Impl: fn ([]const ColumnFamily) type) void {
assertIsDatabase(Impl(&.{}));
for (@typeInfo(tests(Impl)).Struct.decls) |decl| {
try @call(.auto, @field(tests(Impl), decl.name), .{});
}
}

/// Interface defining the blockstore's dependency on a database
pub fn Database(comptime Impl: type) type {
return struct {
impl: Impl,

const Self = @This();

pub fn open(
allocator: Allocator,
logger: Logger,
path: []const u8,
) anyerror!Database(Impl) {
return .{
.impl = try Impl.open(allocator, logger, path),
};
}

pub fn deinit(self: *Self) void {
self.impl.deinit();
}

pub fn put(
self: *Self,
comptime cf: ColumnFamily,
key: cf.Key,
value: cf.Value,
) anyerror!void {
return try self.impl.put(cf, key, value);
}

pub fn get(self: *Self, comptime cf: ColumnFamily, key: cf.Key) anyerror!?cf.Value {
return try self.impl.get(cf, key);
}

/// Returns a reference to the serialized bytes.
///
/// This is useful in two situations:
///
/// 1. You don't plan to deserialize the data, and just need the bytes.
///
/// 2. `cf.Value` is []const u8, and you don't need an owned slice. In this
/// case, getBytes is faster than get. But if you *do* need an owned slice,
/// then it's faster to call `get` insted of calling this function followed
/// by memcpy.
pub fn getBytes(
self: *Self,
comptime cf: ColumnFamily,
key: cf.Key,
) anyerror!?BytesRef {
return try self.impl.getBytes(cf, key);
}

pub fn delete(self: *Self, comptime cf: ColumnFamily, key: cf.Key) anyerror!void {
return try self.impl.delete(cf, key);
}

pub fn initWriteBatch(self: *Self) anyerror!WriteBatch {
return .{ .impl = self.impl.initBatch() };
}

pub fn commit(self: *Self, batch: WriteBatch) anyerror!void {
return self.impl.commit(batch.impl);
}

/// A write batch is a sequence of operations that execute atomically.
/// This is typically called a "transaction" in most databases.
///
/// Use this instead of Database.put or Database.delete when you need
/// to ensure that a group of operations are either all executed
/// successfully, or none of them are executed.
///
/// It is called a write batch instead of a transaction because:
/// - rocksdb uses the name "write batch" for this concept
/// - this name avoids confusion with solana transactions
pub const WriteBatch = struct {
impl: Impl.WriteBatch,

pub fn put(
self: *WriteBatch,
comptime cf: ColumnFamily,
key: cf.Key,
value: cf.Value,
) anyerror!void {
return try self.impl.put(cf, key, value);
}

pub fn delete(self: *WriteBatch, comptime cf: ColumnFamily, key: cf.Key) anyerror!void {
return try self.impl.delete(cf, key);
}
};
};
}

pub const ColumnFamily = struct {
name: []const u8,
Key: type,
Value: type,

const Self = @This();

/// At comptime, find this family in a slice. Useful for for fast runtime
/// accesses of data in other slices that are one-to-one with this slice.
pub fn find(comptime self: Self, comptime column_families: []const Self) comptime_int {
for (column_families, 0..) |column_family, i| {
if (std.mem.eql(u8, column_family.name, self.name)) {
return i;
}
}
@compileError("not found");
}
};

/// Bincode-based serializer that should be usable by database implementations.
pub const serializer = struct {
/// Returned slice is owned by the caller. Free with `allocator.free`.
pub fn serializeAlloc(allocator: Allocator, item: anytype) ![]const u8 {
const buf = try allocator.alloc(u8, try sig.bincode.sizeOf(item, .{}));
return sig.bincode.writeToSlice(item, buf);
}

/// Returned data may or may not be owned by the caller.
/// Do both:
/// - Assume the data is owned by the scope where `item` originated,
/// so finish using the slice before returning from the caller (do not store slice as-is)
/// - Call BytesRef.deinit before returning from the caller (as if you own it).
///
/// Use this if the database backend accepts a pointer and immediately calls memcpy.
pub fn serializeToRef(allocator: Allocator, item: anytype) !BytesRef {
return if (@TypeOf(item) == []const u8 or @TypeOf(item) == []u8) .{
.allocator = null,
.data = item,
} else .{
.allocator = allocator,
.data = serializeAlloc(allocator, item),
};
}

/// Returned data is owned by the caller. Free with `allocator.free`.
pub fn deserialize(comptime T: type, allocator: Allocator, bytes: []const u8) !T {
return try sig.bincode.readFromSlice(allocator, T, bytes, .{});
}
};

pub const BytesRef = struct {
allocator: ?Allocator = null,
data: []const u8,

pub fn deinit(self: @This()) void {
if (self.allocator) |a| a.free(self.data);
}
};

/// Test cases that can be applied to any implementation of Database
fn tests(comptime Impl: fn ([]const ColumnFamily) type) type {
return struct {
fn basic() !void {
const Value = struct { hello: u16 };
const cf1 = ColumnFamily{
.name = "one",
.Key = u64,
.Value = Value,
};
const cf2 = ColumnFamily{
.name = "two",
.Key = u64,
.Value = Value,
};
const allocator = std.testing.allocator;
const logger = Logger.init(std.testing.allocator, Logger.TEST_DEFAULT_LEVEL);
defer logger.deinit();
var db = try Database(Impl(&.{ cf1, cf2 })).open(
allocator,
logger,
"test_data/bsdb",
);
defer db.deinit();
try db.put(cf1, 123, .{ .hello = 345 });
const got = try db.get(cf1, 123);
try std.testing.expect(345 == got.?.hello);
const not = try db.get(cf2, 123);
try std.testing.expect(null == not);
const wrong_was_deleted = try db.delete(cf2, 123);
_ = wrong_was_deleted;
// try std.testing.expect(!wrong_was_deleted); // FIXME
const was_deleted = try db.delete(cf1, 123);
_ = was_deleted;
// try std.testing.expect(was_deleted);
const not_now = try db.get(cf1, 123);
try std.testing.expect(null == not_now);
}
};
}
Loading

0 comments on commit 988d6f6

Please sign in to comment.