diff --git a/build.zig.zon b/build.zig.zon index 40389fda7..eb3f6ed2f 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -28,8 +28,8 @@ .hash = "122030ebe280b73693963a67ed656226a67b7f00a0a05665155da00c9fcdee90de88", }, .rocksdb = .{ - .url = "https://github.com/Syndica/rocksdb-zig/archive/6d4230e131183cccb730a7248bd4ca30c559b8bd.tar.gz", - .hash = "12207766d25ba350d6e2f2153fc74a2b3ff204224e1c08adf211cd9e400075033898", + .url = "https://github.com/Syndica/rocksdb-zig/archive/cf966c0c6bb6205a08bb2006f399719542e4e770.tar.gz", + .hash = "1220d25e3ef18526cb6c980b441f05dc96330bedd88a512dcb3aed775a981ce3707d", }, .lsquic = .{ .url = "https://github.com/Syndica/lsquic/archive/ed6ced0cbc6447f7135a32db491e398debdf8af7.tar.gz", diff --git a/src/ledger/benchmarks.zig b/src/ledger/benchmarks.zig index d71d5a948..fc81f8b51 100644 --- a/src/ledger/benchmarks.zig +++ b/src/ledger/benchmarks.zig @@ -58,7 +58,8 @@ pub const BenchmarkLedger = struct { } var timer = try sig.time.Timer.start(); - _ = try inserter.insertShreds(shreds, is_repairs, null, false, null, null); + const result = try inserter.insertShreds(shreds, is_repairs, .{}); + defer result.deinit(); return timer.read(); } @@ -112,7 +113,9 @@ pub const BenchmarkLedger = struct { const total_shreds = shreds.len; - _ = try ledger.shred_inserter.shred_inserter.insertShredsForTest(&inserter, shreds); + const result = try ledger.shred_inserter.shred_inserter + .insertShredsForTest(&inserter, shreds); + result.deinit(); const slot: u32 = 0; const num_reads = total_shreds / 15; @@ -141,7 +144,9 @@ pub const BenchmarkLedger = struct { defer deinitShreds(allocator, shreds); const total_shreds = shreds.len; - _ = try ledger.shred_inserter.shred_inserter.insertShredsForTest(&inserter, shreds); + const result = try ledger.shred_inserter.shred_inserter + .insertShredsForTest(&inserter, shreds); + result.deinit(); const num_reads = total_shreds / 15; const slot: u32 = 0; @@ -166,7 +171,7 @@ pub const BenchmarkLedger = struct { defer state.deinit(); var reader = try state.reader(); const result = try ledger_tests.insertDataForBlockTest(state); - defer result.deinit(); + result.deinit(); var timer = try sig.time.Timer.start(); _ = try reader.getCompleteBlock(result.slot + 2, true); @@ -178,7 +183,7 @@ pub const BenchmarkLedger = struct { defer state.deinit(); var reader = try state.reader(); const result = try ledger_tests.insertDataForBlockTest(state); - defer result.deinit(); + result.deinit(); var timer = try sig.time.Timer.start(); const shreds = try reader.getDataShredsForSlot(result.slot + 2, 0); @@ -192,7 +197,7 @@ pub const BenchmarkLedger = struct { defer state.deinit(); var reader = try state.reader(); const result = try ledger_tests.insertDataForBlockTest(state); - defer result.deinit(); + result.deinit(); var timer = try sig.time.Timer.start(); const items = try reader.getSlotEntriesWithShredInfo(result.slot + 2, 0, true); @@ -213,7 +218,9 @@ pub const BenchmarkLedger = struct { defer deinitShreds(allocator, shreds); const total_shreds = shreds.len; - _ = try ledger.shred_inserter.shred_inserter.insertShredsForTest(&inserter, shreds); + const result = try ledger.shred_inserter.shred_inserter + .insertShredsForTest(&inserter, shreds); + result.deinit(); const slot: u32 = 1; @@ -243,7 +250,9 @@ pub const BenchmarkLedger = struct { const shreds = try testShreds(std.heap.c_allocator, shreds_path); defer deinitShreds(allocator, shreds); - _ = try ledger.shred_inserter.shred_inserter.insertShredsForTest(&inserter, shreds); + const result = try ledger.shred_inserter.shred_inserter + .insertShredsForTest(&inserter, shreds); + result.deinit(); const slot = 1; const start_index = 0; diff --git a/src/ledger/database/interface.zig b/src/ledger/database/interface.zig index a7da827d9..29fd69542 100644 --- a/src/ledger/database/interface.zig +++ b/src/ledger/database/interface.zig @@ -277,10 +277,17 @@ pub const BytesRef = struct { data: []const u8, deinitializer: ?Deinitializer = null, - pub fn deinit(self: @This()) void { + pub fn deinit(self: BytesRef) void { if (self.deinitializer) |d| d.deinit(self.data); } + pub fn clone(self: BytesRef, allocator: Allocator) Allocator.Error!BytesRef { + return .{ + .data = try allocator.dupe(u8, self.data), + .deinitializer = .{ .allocator = allocator }, + }; + } + pub const Deinitializer = union(enum) { allocator: Allocator, rocksdb: *const fn (?*anyopaque) callconv(.C) void, diff --git a/src/ledger/reader.zig b/src/ledger/reader.zig index 8dc937e06..49cf7daf3 100644 --- a/src/ledger/reader.zig +++ b/src/ledger/reader.zig @@ -2103,7 +2103,7 @@ test "findMissingDataIndexes" { .resigned = false, }; shred.data.common.variant = variant; - try shred.data.writePayload(&(.{2} ** 100)); + try ledger.shred.overwriteShredForTest(allocator, &shred, &(.{2} ** 100)); var slot_meta = SlotMeta.init(allocator, shred_slot, null); slot_meta.last_index = 4; @@ -2172,7 +2172,7 @@ test "getCodeShred" { shred.code.common.variant = variant; shred.code.custom.num_data_shreds = 1; shred.code.custom.num_code_shreds = 1; - try shred.code.writePayload(&(.{2} ** 100)); + try ledger.shred.overwriteShredForTest(allocator, &shred, &(.{2} ** 100)); const shred_slot = shred.commonHeader().slot; const shred_index = shred.commonHeader().index; diff --git a/src/ledger/shred.zig b/src/ledger/shred.zig index 476b73ffe..aa2cb0ca7 100644 --- a/src/ledger/shred.zig +++ b/src/ledger/shred.zig @@ -61,12 +61,25 @@ pub const Shred = union(ShredType) { }; } + pub fn clone(self: Self) Allocator.Error!Self { + return switch (self) { + .code => |shred| .{ .code = try shred.clone() }, + .data => |shred| .{ .data = try shred.clone() }, + }; + } + pub fn payload(self: Self) []const u8 { return switch (self) { inline .code, .data => |shred| shred.payload, }; } + pub fn payloadMut(self: Self) []u8 { + return switch (self) { + inline .code, .data => |shred| shred.payload, + }; + } + pub fn commonHeader(self: Self) CommonHeader { return switch (self) { inline .code, .data => |c| c.common, @@ -136,12 +149,6 @@ pub const Shred = union(ShredType) { }; } - pub fn setMerkleProof(self: *Self, proof: MerkleProofEntryList) !void { - return switch (self.*) { - inline .data, .code => |*s| @TypeOf(s.*).generic.setMerkleProof(s, proof), - }; - } - pub fn id(self: Self) ShredId { return switch (self) { inline .data, .code => |s| s.id(), @@ -170,6 +177,12 @@ pub const CodeShred = struct { self.allocator.free(self.payload); } + pub fn clone(self: Self) Allocator.Error!Self { + var new = self; + new.payload = try new.allocator.dupe(u8, new.payload); + return new; + } + /// agave: ShredCode::from_recovered_shard pub fn fromRecoveredShard( allocator: Allocator, @@ -195,14 +208,16 @@ pub const CodeShred = struct { const writer = buf.writer(); try bincode.write(writer, common_header, .{}); try bincode.write(writer, code_header, .{}); - var shred = Self{ + if (chained_merkle_root) |hash| + try setChainedMerkleRoot(payload, common_header.variant, hash); + if (retransmitter_signature) |sign| + try setRetransmitterSignatureFor(payload, common_header.variant, sign); + const shred = Self{ .allocator = allocator, .common = common_header, .custom = code_header, .payload = payload, }; - if (chained_merkle_root) |hash| try generic.setChainedMerkleRoot(&shred, hash); - if (retransmitter_signature) |sign| try generic.setRetransmitterSignature(&shred, sign); try shred.sanitize(); return shred; } @@ -286,6 +301,12 @@ pub const DataShred = struct { self.allocator.free(self.payload); } + pub fn clone(self: Self) Allocator.Error!Self { + var new = self; + new.payload = try new.allocator.dupe(u8, new.payload); + return new; + } + /// agave: ShredData::from_recovered_shard pub fn fromRecoveredShard( allocator: Allocator, @@ -304,11 +325,12 @@ pub const DataShred = struct { @memcpy(payload[Signature.size..][0..shard_size], shard); @memset(payload[Signature.size + shard_size ..], 0); var shred = try generic.fromPayloadOwned(allocator, payload); - if (shard_size != try capacity(code_shred_constants, shred.common.variant)) { + if (shard_size != try capacity(code_shred_constants, shred.common.variant)) return error.InvalidShardSize; - } - if (chained_merkle_root) |hash| try generic.setChainedMerkleRoot(&shred, hash); - if (retransmitter_signature) |sign| try generic.setRetransmitterSignature(&shred, sign); + if (chained_merkle_root) |hash| + try setChainedMerkleRoot(payload, shred.common.variant, hash); + if (retransmitter_signature) |sign| + try setRetransmitterSignatureFor(payload, shred.common.variant, sign); try shred.sanitize(); return shred; } @@ -385,29 +407,19 @@ pub const DataShred = struct { /// Analogous to [Shred trait](https://github.com/anza-xyz/agave/blob/8c5a33a81a0504fd25d0465bed35d153ff84819f/ledger/src/shred/traits.rs#L6) fn generic_shred(shred_type: ShredType) type { - const Self: type, // - const CustomHeader: type, // + const Self = switch (shred_type) { + .data => DataShred, + .code => CodeShred, + }; + const CustomHeader = switch (shred_type) { + .data => DataHeader, + .code => CodeHeader, + }; const constants: ShredConstants = switch (shred_type) { - .data => .{ DataShred, DataHeader, data_shred_constants }, - .code => .{ CodeShred, CodeHeader, code_shred_constants }, + .data => data_shred_constants, + .code => code_shred_constants, }; return struct { - fn writePayload(self: *Self, data: []const u8) !void { - if (self.payload.len < constants.payload_size) { - return error.InvalidPayloadSize; - } - @memset(self.payload, 0); - - var buf = std.io.fixedBufferStream(self.payload[0..constants.payload_size]); - const writer = buf.writer(); - - try bincode.write(writer, self.common, .{}); - try bincode.write(writer, self.custom, .{}); - - const offset = writer.context.pos; - @memcpy(self.payload[offset .. offset + data.len], data); - } - fn fromPayload(allocator: Allocator, payload: []const u8) !Self { // NOTE(x19): is it ok if payload.len > constants.payload_size? the test_data_shred is 1207 bytes if (payload.len < constants.payload_size) { @@ -472,12 +484,12 @@ fn generic_shred(shred_type: ShredType) type { /// The return contains a pointer to data owned by the shred. fn merkleProof(self: *const Self) !MerkleProofEntryList { - return getMerkleProof(self.payload, constants, self.common.variant); + return getMerkleProofFor(self.payload, constants, self.common.variant); } fn merkleNode(self: Self) !Hash { const offset = try proofOffset(constants, self.common.variant); - return getMerkleNode(self.payload, Signature.size, offset); + return getMerkleNodeAt(self.payload, Signature.size, offset); } fn erasureShardAsSlice(self: *const Self) ![]const u8 { @@ -511,37 +523,6 @@ fn generic_shred(shred_type: ShredType) type { return layout.getChainedMerkleRoot(self.payload) orelse error.InvalidPayloadSize; } - /// agave: set_chained_merkle_root - fn setChainedMerkleRoot(self: *Self, chained_merkle_root: Hash) !void { - const offset = try getChainedMerkleRootOffset(self.common.variant); - const end = offset + SIZE_OF_MERKLE_ROOT; - if (self.payload.len < end) { - return error.InvalidPayloadSize; - } - @memcpy(self.payload[offset..end], &chained_merkle_root.data); - } - - /// agave: set_merkle_proof - fn setMerkleProof(self: *Self, proof: MerkleProofEntryList) !void { - try proof.sanitize(); - const proof_size = self.common.variant.proof_size; - if (proof.len != proof_size) { - return error.InvalidMerkleProof; - } - const offset = try proofOffset(constants, self.common.variant); - if (self.payload.len < offset + proof.len * merkle_proof_entry_size) { - return error.InvalidProofSize; - } - var start = offset; - var proof_iterator = proof.iterator(); - while (proof_iterator.next()) |entry| { - // TODO test: agave uses bincode here. does that make any difference? - const end = merkle_proof_entry_size + start; - @memcpy(self.payload[start..end], entry); - start = end; - } - } - /// agave: retransmitter_signature fn retransmitterSignature(self: Self) !Signature { const offset = try retransmitterSignatureOffset(self.common.variant); @@ -553,16 +534,6 @@ fn generic_shred(shred_type: ShredType) type { @memcpy(&sig_bytes, self.payload[offset..end]); return .{ .data = sig_bytes }; } - - /// agave: setRetransmitterSignature - fn setRetransmitterSignature(self: *Self, signature: Signature) !void { - const offset = try retransmitterSignatureOffset(self.common.variant); - const end = offset + Signature.size; - if (self.payload.len < end) { - return error.InvalidPayloadSize; - } - @memcpy(self.payload[offset..end], &signature.data); - } }; } @@ -601,13 +572,28 @@ fn getMerkleRoot( .code => codeIndex(shred) orelse return error.InvalidErasureShardIndex, .data => dataIndex(shred) orelse return error.InvalidErasureShardIndex, }; - const proof = try getMerkleProof(shred, constants, variant); + const proof = try getMerkleProofFor(shred, constants, variant); const offset = try proofOffset(constants, variant); - const node = try getMerkleNode(shred, Signature.size, offset); + const node = try getMerkleNodeAt(shred, Signature.size, offset); return calculateMerkleRoot(index, node, proof); } -fn getMerkleProof( +/// agave: set_chained_merkle_root +fn setChainedMerkleRoot(shred: []u8, variant: ShredVariant, chained_merkle_root: Hash) !void { + const offset = try getChainedMerkleRootOffset(variant); + const end = offset + SIZE_OF_MERKLE_ROOT; + if (shred.len < end) { + return error.InvalidPayloadSize; + } + @memcpy(shred[offset..end], &chained_merkle_root.data); +} + +pub fn getMerkleProof(shred: []const u8) !MerkleProofEntryList { + const variant = layout.getShredVariant(shred) orelse return error.UnknownShredVariant; + return getMerkleProofFor(shred, variant.constants(), variant); +} + +fn getMerkleProofFor( shred: []const u8, constants: ShredConstants, variant: ShredVariant, @@ -624,7 +610,35 @@ fn getMerkleProof( }; } -fn getMerkleNode(shred: []const u8, start: usize, end: usize) !Hash { +/// agave: set_merkle_proof +pub fn setMerkleProof(shred: []u8, proof: MerkleProofEntryList) !void { + const variant = layout.getShredVariant(shred) orelse return error.UnknownShredVariant; + try proof.sanitize(); + const proof_size = variant.proof_size; + if (proof.len != proof_size) { + return error.InvalidMerkleProof; + } + const offset = try proofOffset(variant.constants(), variant); + if (shred.len < offset + proof.len * merkle_proof_entry_size) { + return error.InvalidProofSize; + } + var start = offset; + var proof_iterator = proof.iterator(); + while (proof_iterator.next()) |entry| { + // TODO test: agave uses bincode here. does that make any difference? + const end = merkle_proof_entry_size + start; + @memcpy(shred[start..end], entry); + start = end; + } +} + +pub fn getMerkleNode(shred: []const u8) !Hash { + const variant = layout.getShredVariant(shred) orelse return error.UnknownShredVariant; + const offset = try proofOffset(variant.constants(), variant); + return getMerkleNodeAt(shred, Signature.size, offset); +} + +fn getMerkleNodeAt(shred: []const u8, start: usize, end: usize) !Hash { if (shred.len < end) return error.InvalidPayloadSize; return hashv(&.{ MERKLE_HASH_PREFIX_LEAF, shred[start..end] }); } @@ -837,6 +851,16 @@ pub const MerkleProofEntryList = struct { } }; +/// agave: setRetransmitterSignature +fn setRetransmitterSignatureFor(shred: []u8, variant: ShredVariant, signature: Signature) !void { + const offset = try retransmitterSignatureOffset(variant); + const end = offset + Signature.size; + if (shred.len < end) { + return error.InvalidPayloadSize; + } + @memcpy(shred[offset..end], &signature.data); +} + pub const CommonHeader = struct { leader_signature: Signature, variant: ShredVariant, @@ -1137,17 +1161,9 @@ pub const layout = struct { return Hash.fromSizedSlice(shred[offset..][0..SIZE_OF_MERKLE_ROOT]); } - pub fn setRetransmitterSignature( - shred: []u8, - signature: Signature, - ) !void { - const variant = getShredVariant(shred) orelse return error.UnknownVariant; - const offset = try retransmitterSignatureOffset(variant); - const end = offset + Signature.size; - if (shred.len < end) { - return error.InvalidPayloadSize; - } - @memcpy(shred[offset..end], &signature.data); + pub fn setRetransmitterSignature(shred: []u8, signature: Signature) !void { + const variant = getShredVariant(shred) orelse return error.UnknownShredVariant; + return setRetransmitterSignatureFor(shred, variant, signature); } /// agave: get_reference_tick @@ -1172,6 +1188,33 @@ fn getInt( return std.mem.readInt(Int, bytes, .little); } +pub fn overwriteShredForTest(allocator: Allocator, shred: *Shred, data: []const u8) !void { + const constants = switch (shred.*) { + .code => code_shred_constants, + .data => data_shred_constants, + }; + if (shred.payload().len < constants.payload_size) { + return error.InvalidPayloadSize; + } + const new_payload = try allocator.dupe(u8, shred.payload()); + @memset(new_payload, 0); + + var buf = std.io.fixedBufferStream(new_payload[0..constants.payload_size]); + const writer = buf.writer(); + + switch (shred.*) { + inline .code, .data => |*typed_shred| { + try bincode.write(writer, typed_shred.common, .{}); + try bincode.write(writer, typed_shred.custom, .{}); + allocator.free(typed_shred.payload); + typed_shred.payload = new_payload; + }, + } + + const offset = writer.context.pos; + @memcpy(new_payload[offset .. offset + data.len], data); +} + test "basic shred variant round trip" { try testShredVariantRoundTrip(0x4C, .{ .shred_type = .code, diff --git a/src/ledger/shred_inserter/merkle_root_checks.zig b/src/ledger/shred_inserter/merkle_root_checks.zig index 2bab97408..fb5011738 100644 --- a/src/ledger/shred_inserter/merkle_root_checks.zig +++ b/src/ledger/shred_inserter/merkle_root_checks.zig @@ -80,11 +80,13 @@ pub const MerkleRootValidator = struct { .shred_type = merkle_root_meta.first_received_shred_type, }; if (try self.shreds.get(shred_id)) |conflicting_shred| { + defer conflicting_shred.deinit(); + const original = try shred.clone(); + errdefer original.deinit(); + const conflict = try conflicting_shred.clone(self.allocator); + errdefer conflict.deinit(); try self.duplicate_shreds.append(.{ - .MerkleRootConflict = .{ - .original = shred.*, // TODO lifetimes (cloned in rust) - .conflict = conflicting_shred, - }, + .MerkleRootConflict = .{ .original = original, .conflict = conflict }, }); } else { self.logger.err().logf(&newlinesToSpaces( @@ -210,7 +212,7 @@ pub const MerkleRootValidator = struct { ); return true; }; - errdefer other_shred.deinit(); + defer other_shred.deinit(); const older_shred, const newer_shred = switch (direction) { .forward => .{ shred.payload(), other_shred.data }, @@ -250,11 +252,13 @@ pub const MerkleRootValidator = struct { ); if (!try self.duplicate_shreds.contains(slot)) { + const original = try shred.clone(); + errdefer original.deinit(); + const conflict = try other_shred.clone(self.allocator); + errdefer conflict.deinit(); try self.duplicate_shreds.append(.{ - .ChainedMerkleRootConflict = .{ .original = shred, .conflict = other_shred }, + .ChainedMerkleRootConflict = .{ .original = original, .conflict = conflict }, }); - } else { - other_shred.deinit(); } return false; diff --git a/src/ledger/shred_inserter/recovery.zig b/src/ledger/shred_inserter/recovery.zig index d63b6e8b8..f4b99ed9a 100644 --- a/src/ledger/shred_inserter/recovery.zig +++ b/src/ledger/shred_inserter/recovery.zig @@ -301,7 +301,8 @@ fn setMerkleProofs( var tree = try std.ArrayList(Hash).initCapacity(allocator, all_shreds.len); defer tree.deinit(); for (all_shreds) |shred| { - tree.appendAssumeCapacity(try shred.merkleNode()); + const merkle_node = try sig.ledger.shred.getMerkleNode(shred.payload()); + tree.appendAssumeCapacity(merkle_node); } try makeMerkleTree(&tree); @@ -319,7 +320,7 @@ fn setMerkleProofs( return error.InvalidMerkleProof; } if (was_present) { - const expected_proof = try shred.merkleProof(); + const expected_proof = try sig.ledger.shred.getMerkleProof(shred.payload()); var expected_proof_iterator = expected_proof.iterator(); var i: usize = 0; while (expected_proof_iterator.next()) |expected_entry| : (i += 1) { @@ -329,8 +330,8 @@ fn setMerkleProofs( } } } else { - try shred.setMerkleProof(proof); - std.debug.assert(!std.meta.isError(shred.sanitize())); + try sig.ledger.shred.setMerkleProof(shred.payloadMut(), proof); + std.debug.assert(!std.meta.isError(shred.sanitize())); // TODO error somewhere else // TODO: Assert that shred payload is fully populated. } } diff --git a/src/ledger/shred_inserter/shred_inserter.zig b/src/ledger/shred_inserter/shred_inserter.zig index de24c494a..544706a66 100644 --- a/src/ledger/shred_inserter/shred_inserter.zig +++ b/src/ledger/shred_inserter/shred_inserter.zig @@ -80,9 +80,26 @@ pub const ShredInserter = struct { self.logger.deinit(); } - pub const InsertShredsResult = struct { + pub const Options = struct { + /// Skip some validations for performance. + is_trusted: bool = false, + /// Necessary for shred recovery. + slot_leaders: ?SlotLeaders = null, + /// Send recovered shreds here if provided. + retransmit_sender: ?PointerClosure([]const []const u8, void) = null, + /// Records all shreds. + shred_tracker: ?*sig.shred_network.shred_tracker.BasicShredTracker = null, + }; + + pub const Result = struct { completed_data_set_infos: ArrayList(CompletedDataSetInfo), duplicate_shreds: ArrayList(PossibleDuplicateShred), + + pub fn deinit(self: Result) void { + for (self.duplicate_shreds.items) |ds| ds.deinit(); + self.completed_data_set_infos.deinit(); + self.duplicate_shreds.deinit(); + } }; /// The main function that performs the shred insertion logic @@ -140,16 +157,23 @@ pub const ShredInserter = struct { /// `CompletedDataSetInfo` and a vector of its corresponding index in the /// input `shreds` vector. /// + /// Lifetime rules: + /// - Shreds received over the network are owned by the caller to + /// insertShreds. The ShredInserter must treat them as a reference that + /// will die when insertShreds returns. + /// - Shreds returned from insertShreds are owned by the caller to + /// insertShreds. This means the lifetime must exceed the insertShreds + /// call. + /// - Any shreds created during insertShreds must be either returned or + /// deinitialized before returning. + /// /// agave: do_insert_shreds pub fn insertShreds( self: *Self, shreds: []const Shred, is_repaired: []const bool, - maybe_slot_leaders: ?SlotLeaders, - is_trusted: bool, - retransmit_sender: ?PointerClosure([]const []const u8, void), - shred_tracker: ?*sig.shred_network.shred_tracker.BasicShredTracker, - ) !InsertShredsResult { + options: Options, + ) !Result { const timestamp = sig.time.Instant.now(); /////////////////////////// // check inputs for validity and edge cases @@ -186,12 +210,12 @@ pub const ShredInserter = struct { // var shred_insertion_timer = try Timer.start(); var newly_completed_data_sets = ArrayList(CompletedDataSetInfo).init(allocator); - defer newly_completed_data_sets.deinit(); + errdefer newly_completed_data_sets.deinit(); for (shreds, is_repaired) |shred, is_repair| { const shred_source: ShredSource = if (is_repair) .repaired else .turbine; switch (shred) { .data => |data_shred| { - if (shred_tracker) |tracker| { + if (options.shred_tracker) |tracker| { tracker.registerDataShred(&shred.data, timestamp) catch |err| { switch (err) { error.SlotUnderflow, error.SlotOverflow => { @@ -206,8 +230,8 @@ pub const ShredInserter = struct { &state, merkle_root_validator, write_batch, - is_trusted, - maybe_slot_leaders, + options.is_trusted, + options.slot_leaders, shred_source, )) |completed_data_sets| { if (is_repair) { @@ -237,7 +261,7 @@ pub const ShredInserter = struct { &state, merkle_root_validator, write_batch, - is_trusted, + options.is_trusted, shred_source, ); }, @@ -251,7 +275,7 @@ pub const ShredInserter = struct { var shred_recovery_timer = try Timer.start(); var valid_recovered_shreds = ArrayList([]const u8).init(allocator); defer valid_recovered_shreds.deinit(); - if (maybe_slot_leaders) |slot_leaders| { + if (options.slot_leaders) |leaders| { var reed_solomon_cache = try ReedSolomonCache.init(allocator); defer reed_solomon_cache.deinit(); const recovered_shreds = try self.tryShredRecovery( @@ -271,7 +295,7 @@ pub const ShredInserter = struct { if (shred == .data) { self.metrics.num_recovered.inc(); } - const leader = slot_leaders.get(shred.commonHeader().slot); + const leader = leaders.get(shred.commonHeader().slot); if (leader == null) { continue; } @@ -283,10 +307,10 @@ pub const ShredInserter = struct { // erasure batch, no need to store code shreds in // blockstore. if (shred == .code) { - try valid_recovered_shreds.append(shred.payload()); // TODO lifetime + try valid_recovered_shreds.append(shred.payload()); continue; } - if (shred_tracker) |tracker| { + if (options.shred_tracker) |tracker| { tracker.registerDataShred(&shred.data, timestamp) catch |err| { switch (err) { error.SlotUnderflow, error.SlotOverflow => { @@ -301,14 +325,14 @@ pub const ShredInserter = struct { &state, merkle_root_validator, write_batch, - is_trusted, - maybe_slot_leaders, + options.is_trusted, + options.slot_leaders, .recovered, )) |completed_data_sets| { defer completed_data_sets.deinit(); try newly_completed_data_sets.appendSlice(completed_data_sets.items); self.metrics.num_recovered_inserted.inc(); - try valid_recovered_shreds.append(shred.payload()); // TODO lifetime + try valid_recovered_shreds.append(shred.payload()); } else |e| switch (e) { error.Exists => self.metrics.num_recovered_exists.inc(), error.InvalidShred => self.metrics.num_recovered_failed_invalid.inc(), @@ -319,8 +343,10 @@ pub const ShredInserter = struct { else => return e, // TODO explicit } } - if (valid_recovered_shreds.items.len > 0) if (retransmit_sender) |sender| { - sender.call(valid_recovered_shreds.items); // TODO lifetime + if (valid_recovered_shreds.items.len > 0) if (options.retransmit_sender) |_| { + // TODO: This needs to be implemented differently, probably with + // a channel, and with proper consideration for lifetimes. + // sender.call(valid_recovered_shreds.items); }; } self.metrics.shred_recovery_elapsed_us.add(shred_recovery_timer.read().asMicros()); @@ -405,7 +431,6 @@ pub const ShredInserter = struct { return .{ .completed_data_set_infos = newly_completed_data_sets, - // TODO: ensure all duplicate shreds exceed lifetime of pending state .duplicate_shreds = state.duplicate_shreds, }; } @@ -480,7 +505,9 @@ pub const ShredInserter = struct { // dupes if (index_meta.code_index.contains(shred.common.index)) { self.metrics.num_code_shreds_exists.inc(); - try state.duplicate_shreds.append(.{ .Exists = .{ .code = shred } }); + const dupe = try shred.clone(); + errdefer dupe.deinit(); + try state.duplicate_shreds.append(.{ .Exists = .{ .code = dupe } }); return false; } @@ -543,10 +570,11 @@ pub const ShredInserter = struct { slot, erasure_meta, )) |conflicting_shred| { + defer conflicting_shred.deinit(); // found the duplicate self.db.put(schema.duplicate_slots, slot, .{ .shred1 = conflicting_shred.data, - .shred2 = shred.payload, + .shred2 = try shred.allocator.dupe(u8, shred.payload), }) catch |e| { // TODO: only log a database error? self.logger.err().logf( @@ -554,12 +582,12 @@ pub const ShredInserter = struct { .{ slot, erasure_set_id, e }, ); }; + const original = try shred.clone(); + errdefer original.deinit(); + const conflict = try conflicting_shred.clone(self.allocator); + errdefer conflict.deinit(); try state.duplicate_shreds.append(.{ - .ErasureConflict = .{ - // TODO lifetimes - .original = .{ .code = shred }, - .conflict = conflicting_shred, - }, + .ErasureConflict = .{ .original = .{ .code = original }, .conflict = conflict }, }); } else { self.logger.err().logf(&newlinesToSpaces( @@ -590,7 +618,7 @@ pub const ShredInserter = struct { _: CodeShred, // TODO: figure out why this is here. delete it or add what is missing. slot: Slot, erasure_meta: *const ErasureMeta, - ) !?BytesRef { // TODO consider lifetime + ) !?BytesRef { // TODO lifetime // Search for the shred which set the initial erasure config, either inserted, // or in the current batch in just_inserted_shreds. const index: u32 = @intCast(erasure_meta.first_received_code_index); @@ -629,7 +657,11 @@ pub const ShredInserter = struct { if (!is_trusted) { if (isDataShredPresent(shred, slot_meta, &index_meta.data_index)) { - try state.duplicate_shreds.append(.{ .Exists = shred_union }); + { + const dupe = try shred_union.clone(); + errdefer dupe.deinit(); + try state.duplicate_shreds.append(.{ .Exists = dupe }); + } return error.Exists; } if (shred.isLastInSlot() and @@ -669,11 +701,9 @@ pub const ShredInserter = struct { // A previous shred has been inserted in this batch or in blockstore // Compare our current shred against the previous shred for potential // conflicts - if (!try merkle_root_validator.checkConsistency( - slot, - merkle_root_meta.asRef(), - &shred_union, - )) { + if (!try merkle_root_validator + .checkConsistency(slot, merkle_root_meta.asRef(), &shred_union)) + { return error.InvalidShred; } } @@ -762,7 +792,7 @@ pub const ShredInserter = struct { ), .{ shred_id, slot_meta }); return false; // TODO: this is redundant }; - errdefer ending_shred.deinit(); + defer ending_shred.deinit(); const dupe = meta.DuplicateSlotProof{ .shred1 = ending_shred.data, .shred2 = shred.payload, @@ -771,10 +801,13 @@ pub const ShredInserter = struct { // TODO: only log a database error? self.logger.err().logf("failed to store duplicate slot: {}", .{e}); }; - // FIXME data ownership + const original = try shred.clone(); + errdefer original.deinit(); + const conflict = try ending_shred.clone(self.allocator); + errdefer conflict.deinit(); try duplicate_shreds.append(.{ .LastIndexConflict = .{ - .original = .{ .data = shred }, - .conflict = ending_shred, + .original = .{ .data = original }, + .conflict = conflict, } }); } @@ -908,7 +941,10 @@ pub const ShredInserter = struct { reed_solomon_cache: *ReedSolomonCache, ) !std.ArrayList(Shred) { var available_shreds = ArrayList(Shred).init(self.allocator); - defer available_shreds.deinit(); + defer { + for (available_shreds.items) |shred| shred.deinit(); + available_shreds.deinit(); + } try getRecoveryShreds( self.allocator, @@ -1181,7 +1217,7 @@ const ShredInserterTestState = struct { fn insertShredBytes( self: *ShredInserterTestState, shred_payloads: []const []const u8, - ) !ShredInserter.InsertShredsResult { + ) !ShredInserter.Result { const shreds = try self.allocator().alloc(Shred, shred_payloads.len); defer { for (shreds) |shred| shred.deinit(); @@ -1195,7 +1231,7 @@ const ShredInserterTestState = struct { for (0..shreds.len) |i| { is_repairs[i] = false; } - return self.inserter.insertShreds(shreds, is_repairs, null, false, null, null); + return self.inserter.insertShreds(shreds, is_repairs, .{}); } fn checkInsertCodeShred( @@ -1220,16 +1256,13 @@ const ShredInserterTestState = struct { } }; -pub fn insertShredsForTest( - inserter: *ShredInserter, - shreds: []const Shred, -) !ShredInserter.InsertShredsResult { +pub fn insertShredsForTest(inserter: *ShredInserter, shreds: []const Shred) !ShredInserter.Result { const is_repairs = try inserter.allocator.alloc(bool, shreds.len); defer inserter.allocator.free(is_repairs); for (0..shreds.len) |i| { is_repairs[i] = false; } - return inserter.insertShreds(shreds, is_repairs, null, false, null, null); + return inserter.insertShreds(shreds, is_repairs, .{}); } test "insertShreds single shred" { @@ -1238,7 +1271,8 @@ test "insertShreds single shred" { const allocator = std.testing.allocator; const shred = try Shred.fromPayload(allocator, &ledger.shred.test_data_shred); defer shred.deinit(); - _ = try state.inserter.insertShreds(&.{shred}, &.{false}, null, false, null, null); + const result = try state.inserter.insertShreds(&.{shred}, &.{false}, .{}); + result.deinit(); const stored_shred = try state.db.getBytes( schema.data_shred, .{ shred.commonHeader().slot, shred.commonHeader().index }, @@ -1261,8 +1295,9 @@ test "insertShreds 100 shreds from mainnet" { const shred = try Shred.fromPayload(std.testing.allocator, payload); try shreds.append(shred); } - _ = try state.inserter - .insertShreds(shreds.items, &(.{false} ** shred_bytes.len), null, false, null, null); + const result = try state.inserter + .insertShreds(shreds.items, &(.{false} ** shred_bytes.len), .{}); + result.deinit(); for (shreds.items) |shred| { const bytes = try state.db.getBytes( schema.data_shred, @@ -1289,7 +1324,8 @@ test "chaining basic" { }; // insert slot 1 - _ = try state.insertShredBytes(slots[1]); + var result = try state.insertShredBytes(slots[1]); + result.deinit(); { var slot_meta: SlotMeta = (try state.db.get(state.allocator(), schema.slot_meta, 1)).?; defer slot_meta.deinit(); @@ -1300,7 +1336,8 @@ test "chaining basic" { } // insert slot 2 - _ = try state.insertShredBytes(slots[2]); + result = try state.insertShredBytes(slots[2]); + result.deinit(); { var slot_meta: SlotMeta = (try state.db.get(state.allocator(), schema.slot_meta, 1)).?; defer slot_meta.deinit(); @@ -1319,7 +1356,8 @@ test "chaining basic" { } // insert slot 0 - _ = try state.insertShredBytes(slots[0]); + result = try state.insertShredBytes(slots[0]); + result.deinit(); { var slot_meta: SlotMeta = (try state.db.get(state.allocator(), schema.slot_meta, 0)).?; defer slot_meta.deinit(); @@ -1408,7 +1446,13 @@ test "merkle root metas coding" { &state.db, metrics, ); - defer insert_state.deinit(); + defer { + insert_state.deinit(); + for (insert_state.duplicate_shreds.items) |shred| { + shred.deinit(); + } + insert_state.duplicate_shreds.deinit(); + } { // second shred (same index as first, should conflict with merkle root) var write_batch = try state.db.initWriteBatch(); @@ -1447,12 +1491,7 @@ test "merkle root metas coding" { try state.db.commit(&write_batch); } - for (insert_state.duplicate_shreds.items) |duplicate| { - switch (duplicate) { - .Exists => {}, - inline else => |sc| sc.conflict.deinit(), - } - } + for (insert_state.duplicate_shreds.items) |duplicate| duplicate.deinit(); insert_state.duplicate_shreds.clearRetainingCapacity(); { // third shred (different index, should succeed) @@ -1518,14 +1557,12 @@ test "recovery" { defer allocator.free(is_repairs); for (0..code_shreds.len) |i| is_repairs[i] = false; - _ = try state.inserter.insertShreds( + const result = try state.inserter.insertShreds( code_shreds, is_repairs, - leader_schedule.provider(), - false, - null, - null, + .{ .slot_leaders = leader_schedule.provider() }, ); + result.deinit(); for (data_shreds) |data_shred| { const key = .{ data_shred.data.common.slot, data_shred.data.common.index }; diff --git a/src/ledger/shred_inserter/working_state.zig b/src/ledger/shred_inserter/working_state.zig index 112a9ecb5..2147715c8 100644 --- a/src/ledger/shred_inserter/working_state.zig +++ b/src/ledger/shred_inserter/working_state.zig @@ -112,13 +112,13 @@ pub const PendingInsertShredsState = struct { }; } + /// duplicate_shreds is not deinitialized. ownership is transfered to caller pub fn deinit(self: *Self) void { self.just_inserted_shreds.deinit(); self.erasure_metas.deinit(); self.merkle_root_metas.deinit(); deinitMapRecursive(&self.slot_meta_working_set); deinitMapRecursive(&self.index_working_set); - self.duplicate_shreds.deinit(); self.write_batch.deinit(); } @@ -127,7 +127,6 @@ pub const PendingInsertShredsState = struct { var timer = try Timer.start(); const entry = try self.index_working_set.getOrPut(slot); if (!entry.found_existing) { - // TODO lifetimes (conflicting?) if (try self.db.get(self.allocator, schema.index, slot)) |item| { entry.value_ptr.* = .{ .index = item }; } else { @@ -477,16 +476,32 @@ pub const SlotMetaWorkingSetEntry = struct { }; pub const PossibleDuplicateShred = union(enum) { - Exists: Shred, // Blockstore has another shred in its spot - LastIndexConflict: ShredConflict, // The index of this shred conflicts with `slot_meta.last_index` - ErasureConflict: ShredConflict, // The code shred has a conflict in the erasure_meta - MerkleRootConflict: ShredConflict, // Merkle root conflict in the same fec set - ChainedMerkleRootConflict: ShredConflict, // Merkle root chaining conflict with previous fec set + /// Blockstore has another shred in its spot + Exists: Shred, + /// The index of this shred conflicts with `slot_meta.last_index` + LastIndexConflict: ShredConflict, + /// The code shred has a conflict in the erasure_meta + ErasureConflict: ShredConflict, + /// Merkle root conflict in the same fec set + MerkleRootConflict: ShredConflict, + /// Merkle root chaining conflict with previous fec set + ChainedMerkleRootConflict: ShredConflict, + + pub fn deinit(self: PossibleDuplicateShred) void { + switch (self) { + inline else => |conflict| conflict.deinit(), + } + } }; const ShredConflict = struct { original: Shred, conflict: BytesRef, + + pub fn deinit(self: ShredConflict) void { + self.original.deinit(); + self.conflict.deinit(); + } }; pub const ShredWorkingStore = struct { @@ -497,6 +512,7 @@ pub const ShredWorkingStore = struct { const Self = @This(); /// returned shred lifetime does not exceed this struct + /// you should call deinit on the returned data pub fn get(self: Self, id: ShredId) !?BytesRef { if (self.just_inserted_shreds.get(id)) |shred| { return .{ .data = shred.payload(), .deinitializer = null }; @@ -507,7 +523,7 @@ pub const ShredWorkingStore = struct { }; } - // TODO consider lifetime -> return may be owned by different contexts + /// Returned shred is owned by the caller (you must deinit it) /// This does almost the same thing as `get` and may not actually be necessary. /// This just adds a check on the index and evaluates the cf at comptime instead of runtime. pub fn getWithIndex( @@ -524,7 +540,7 @@ pub const ShredWorkingStore = struct { }; const id = ShredId{ .slot = slot, .index = @intCast(shred_index), .shred_type = shred_type }; return if (self.just_inserted_shreds.get(id)) |shred| - shred + try shred.clone() // TODO perf - avoid clone without causing memory issues else if (index.contains(shred_index)) blk: { const shred = try self.db.getBytes(cf, .{ slot, @intCast(id.index) }) orelse { self.logger.err().logf(&newlinesToSpaces( @@ -540,10 +556,7 @@ pub const ShredWorkingStore = struct { } fn getFromDb(self: Self, comptime cf: ColumnFamily, id: ShredId) !?BytesRef { - return if (try self.db.getBytes(cf, .{ id.slot, @intCast(id.index) })) |s| - s - else - null; + return try self.db.getBytes(cf, .{ id.slot, @intCast(id.index) }); } }; diff --git a/src/ledger/tests.zig b/src/ledger/tests.zig index 57aa2f239..aef3f6ffa 100644 --- a/src/ledger/tests.zig +++ b/src/ledger/tests.zig @@ -15,7 +15,9 @@ const DirectPrintLogger = sig.trace.DirectPrintLogger; const Logger = sig.trace.Logger; const SlotMeta = ledger.meta.SlotMeta; const VersionedTransactionWithStatusMeta = ledger.reader.VersionedTransactionWithStatusMeta; + const comptimePrint = std.fmt.comptimePrint; +const insertShredsForTest = ledger.shred_inserter.shred_inserter.insertShredsForTest; const schema = ledger.schema.schema; @@ -397,9 +399,12 @@ pub fn insertDataForBlockTest(state: *TestState) !InsertDataForBlockResult { deinitShreds(allocator, slice); }; - _ = try ledger.shred_inserter.shred_inserter.insertShredsForTest(&inserter, shreds); - _ = try ledger.shred_inserter.shred_inserter.insertShredsForTest(&inserter, more_shreds); - _ = try ledger.shred_inserter.shred_inserter.insertShredsForTest(&inserter, unrooted_shreds); + var result = try insertShredsForTest(&inserter, shreds); + result.deinit(); + result = try insertShredsForTest(&inserter, more_shreds); + result.deinit(); + result = try insertShredsForTest(&inserter, unrooted_shreds); + result.deinit(); try writer.setRoots(&.{ slot - 1, slot, slot + 1 }); diff --git a/src/shred_network/repair_service.zig b/src/shred_network/repair_service.zig index 956a2ad78..d491ee420 100644 --- a/src/shred_network/repair_service.zig +++ b/src/shred_network/repair_service.zig @@ -407,7 +407,7 @@ pub const AddressedRepairRequest = struct { }; /// How many slots to cache in RepairPeerProvider -const REPAIR_PEERS_CACHE_CAPACITY: usize = 128; +const REPAIR_PEERS_CACHE_CAPACITY: usize = 8; /// Maximum age of a cache item to use for repair peers const REPAIR_PEERS_CACHE_TTL_SECONDS: u64 = 10; diff --git a/src/shred_network/shred_processor.zig b/src/shred_network/shred_processor.zig index ff890f9c2..496102ef8 100644 --- a/src/shred_network/shred_processor.zig +++ b/src/shred_network/shred_processor.zig @@ -46,6 +46,7 @@ pub fn runShredProcessor( shreds.clearRetainingCapacity(); is_repaired.clearRetainingCapacity(); + defer for (shreds.items) |shred| shred.deinit(); while (verified_shred_receiver.tryReceive()) |packet| { const shred_payload = layout.getShred(&packet) orelse return error.InvalidVerifiedShred; const shred = try shreds.addOne(allocator); @@ -62,14 +63,15 @@ pub fn runShredProcessor( } metrics.insertion_batch_size.observe(shreds.items.len); metrics.passed_to_inserter_count.add(shreds.items.len); - _ = try shred_inserter.insertShreds( + const result = try shred_inserter.insertShreds( shreds.items, is_repaired.items, - leader_schedule, - false, - null, - tracker, + .{ + .slot_leaders = leader_schedule, + .shred_tracker = tracker, + }, ); + result.deinit(); } }