From 4467795c4a06380676ece748a62a2032007dd81c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 7 Feb 2025 09:27:22 +0100 Subject: [PATCH 1/2] Make #resize able to increase size --- spec/mfile_spec.cr | 22 ++++++++++++++++++++++ src/lavinmq/mfile.cr | 14 +++++++++++--- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/spec/mfile_spec.cr b/spec/mfile_spec.cr index 0fddbecd3b..0d3d5a6e70 100644 --- a/spec/mfile_spec.cr +++ b/spec/mfile_spec.cr @@ -29,4 +29,26 @@ describe MFile do file.delete end end + + describe "#resize" do + it "can increase size" do + file = File.tempfile "mfile_spec" + file.print "hello world" + file.flush + data = "foo" + initial_size = file.size + MFile.open(file.path, initial_size) do |mfile| + mfile.capacity.should eq initial_size + expect_raises(IO::EOFError) { mfile.write data.to_slice } + mfile.resize(mfile.size + data.bytesize) + mfile.write data.to_slice + mfile.capacity.should eq(initial_size + data.bytesize) + end + file.size.should eq(initial_size + data.bytesize) + data = File.read(file.path) + data.should eq "hello worldfoo" + ensure + file.try &.delete + end + end end diff --git a/src/lavinmq/mfile.cr b/src/lavinmq/mfile.cr index 8101b7d5c3..17fcc7f344 100644 --- a/src/lavinmq/mfile.cr +++ b/src/lavinmq/mfile.cr @@ -271,9 +271,17 @@ class MFile < IO end def resize(new_size : Int) : Nil - raise ArgumentError.new("Can't expand file larger than capacity, use truncate") if new_size > @capacity - @size = new_size.to_i64 - @pos = new_size.to_i64 if @pos > new_size + if new_size > @capacity + raise File::Error.new("Can't resize readonly file", file: @path) if @readonly + unsafe_unmap + @capacity = new_size.to_i64 + code = LibC.ftruncate(@fd, @capacity) + raise File::Error.from_errno("Error truncating file", file: @path) if code < 0 + @buffer = mmap + else + @size = new_size.to_i64 + @pos = new_size.to_i64 if @pos > new_size + end end # Read from a specific position in the file From 9ee24e6b2903cff3d3dca40ab26347f7247ea105 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 7 Feb 2025 13:40:58 +0100 Subject: [PATCH 2/2] Use MFiles in follower Close and unmap files that haven't been accessed for a moment --- src/lavinmq/clustering/client.cr | 65 +++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 2 deletions(-) diff --git a/src/lavinmq/clustering/client.cr b/src/lavinmq/clustering/client.cr index 95b5e90b58..1a8727f79a 100644 --- a/src/lavinmq/clustering/client.cr +++ b/src/lavinmq/clustering/client.cr @@ -1,10 +1,62 @@ require "../clustering" require "../clustering/proxy" require "../data_dir_lock" +require "../mfile" require "lz4" module LavinMQ module Clustering + class ReplicatedFile + getter last_access = Time.monotonic + getter path : String + + @file : MFile + + def initialize(@path : String, @capacity : Int64) + # Ensure the file exists + @file = MFile.new(@path, @capacity) + end + + def write(data : Bytes) + @last_access = Time.monotonic + resize_to_fit(data.size) + @file.write data + end + + def resize_to_fit(bytes_to_fit) + new_capacity = size + bytes_to_fit + if new_capacity > @capacity + @file.resize new_capacity + @capacity = new_capacity + end + end + + def size + @file.try &.size || File.size(@path) + end + + def rename(new_filename) + @last_access = Time.monotonic + File.rename @path, new_filename + @path = new_filename + end + + def delete + @last_access = Time.monotonic + File.delete @path + end + + def close + @last_access = Time.monotonic + @file.close + end + + private def with_file(&) + @last_access = Time.monotonic + yield (@file ||= MFile.new(@path, @capacity)) + end + end + class Client Log = LavinMQ::Log.for "clustering.client" @data_dir_lock : DataDirLock @@ -19,10 +71,18 @@ module LavinMQ def initialize(@config : Config, @id : Int32, @password : String, proxy = true) System.maximize_fd_limit @data_dir = config.data_dir - @files = Hash(String, File).new do |h, k| + @files = Hash(String, ReplicatedFile).new do |h, k| path = File.join(@data_dir, k) Dir.mkdir_p File.dirname(path) - h[k] = File.open(path, "a").tap &.sync = true + h[k] = ReplicatedFile.new(path, Config.instance.segment_size) + h.reject! do |_, f| + if f.last_access < (Time.monotonic - 10.seconds) + f.close + true + end + false + end + h[k] end Dir.mkdir_p @data_dir @data_dir_lock = DataDirLock.new(@data_dir).tap &.acquire @@ -213,6 +273,7 @@ module LavinMQ when .negative? # append bytes to file Log.debug { "Appending #{len.abs} bytes to #{filename}" } f = @files[filename] + f.resize_to_fit(len.abs) IO.copy(lz4, f, len.abs) == len.abs || raise IO::EOFError.new("Full append not received") when .zero? # file is deleted Log.debug { "Deleting #{filename}" }