Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MFiles in followers #934

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions spec/mfile_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,26 @@ describe MFile do
file.delete
end
end

describe "#resize" do
it "can increase size" do
file = File.tempfile "mfile_spec"
file.print "hello world"
file.flush
data = "foo"
initial_size = file.size
MFile.open(file.path, initial_size) do |mfile|
mfile.capacity.should eq initial_size
expect_raises(IO::EOFError) { mfile.write data.to_slice }
mfile.resize(mfile.size + data.bytesize)
mfile.write data.to_slice
mfile.capacity.should eq(initial_size + data.bytesize)
end
file.size.should eq(initial_size + data.bytesize)
data = File.read(file.path)
data.should eq "hello worldfoo"
ensure
file.try &.delete
end
end
end
65 changes: 63 additions & 2 deletions src/lavinmq/clustering/client.cr
Original file line number Diff line number Diff line change
@@ -1,10 +1,62 @@
require "../clustering"
require "../clustering/proxy"
require "../data_dir_lock"
require "../mfile"
require "lz4"

module LavinMQ
module Clustering
class ReplicatedFile
getter last_access = Time.monotonic
getter path : String

@file : MFile

def initialize(@path : String, @capacity : Int64)
# Ensure the file exists
@file = MFile.new(@path, @capacity)
end

def write(data : Bytes)
@last_access = Time.monotonic
resize_to_fit(data.size)
@file.write data
end

def resize_to_fit(bytes_to_fit)
new_capacity = size + bytes_to_fit
if new_capacity > @capacity
@file.resize new_capacity
@capacity = new_capacity
end
end

def size
@file.try &.size || File.size(@path)
end

def rename(new_filename)
@last_access = Time.monotonic
File.rename @path, new_filename
@path = new_filename
end

def delete
@last_access = Time.monotonic
File.delete @path
end

def close
@last_access = Time.monotonic
@file.close
end

private def with_file(&)
@last_access = Time.monotonic
yield (@file ||= MFile.new(@path, @capacity))
end
end

class Client
Log = LavinMQ::Log.for "clustering.client"
@data_dir_lock : DataDirLock
Expand All @@ -19,10 +71,18 @@ module LavinMQ
def initialize(@config : Config, @id : Int32, @password : String, proxy = true)
System.maximize_fd_limit
@data_dir = config.data_dir
@files = Hash(String, File).new do |h, k|
@files = Hash(String, ReplicatedFile).new do |h, k|
path = File.join(@data_dir, k)
Dir.mkdir_p File.dirname(path)
h[k] = File.open(path, "a").tap &.sync = true
h[k] = ReplicatedFile.new(path, Config.instance.segment_size)
h.reject! do |_, f|
if f.last_access < (Time.monotonic - 10.seconds)
f.close
true
end
false
end
h[k]
end
Dir.mkdir_p @data_dir
@data_dir_lock = DataDirLock.new(@data_dir).tap &.acquire
Expand Down Expand Up @@ -213,6 +273,7 @@ module LavinMQ
when .negative? # append bytes to file
Log.debug { "Appending #{len.abs} bytes to #{filename}" }
f = @files[filename]
f.resize_to_fit(len.abs)
IO.copy(lz4, f, len.abs) == len.abs || raise IO::EOFError.new("Full append not received")
when .zero? # file is deleted
Log.debug { "Deleting #{filename}" }
Expand Down
14 changes: 11 additions & 3 deletions src/lavinmq/mfile.cr
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,17 @@ class MFile < IO
end

def resize(new_size : Int) : Nil
raise ArgumentError.new("Can't expand file larger than capacity, use truncate") if new_size > @capacity
@size = new_size.to_i64
@pos = new_size.to_i64 if @pos > new_size
if new_size > @capacity
raise File::Error.new("Can't resize readonly file", file: @path) if @readonly
unsafe_unmap
@capacity = new_size.to_i64
code = LibC.ftruncate(@fd, @capacity)
raise File::Error.from_errno("Error truncating file", file: @path) if code < 0
@buffer = mmap
else
@size = new_size.to_i64
@pos = new_size.to_i64 if @pos > new_size
end
end

# Read from a specific position in the file
Expand Down
Loading