Skip to content

Commit

Permalink
Fix write buffers concurrent writes
Browse files Browse the repository at this point in the history
  • Loading branch information
astrada committed Sep 1, 2019
1 parent 4a35399 commit ed00948
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 32 deletions.
28 changes: 12 additions & 16 deletions src/buffering.ml
Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,16 @@ struct

let flush_block key buffers =
let block_opt =
Utils.with_lock buffers.mutex
(fun () ->
Utils.safe_find buffers.blocks key
) in
Utils.safe_find buffers.blocks key in
flush key block_opt buffers

let flush_blocks remote_id buffers =
Utils.log_with_header
"BEGIN: Flushing memory buffers (remote id=%s)\n%!"
remote_id;
let blocks =
Utils.with_lock buffers.mutex
(fun () ->
Utils.with_lock buffers.mutex
(fun () ->
let blocks =
match Utils.safe_find buffers.file_block_indexes remote_id with
| None -> []
| Some is ->
Expand All @@ -320,12 +317,13 @@ struct
(i, Utils.safe_find buffers.blocks (remote_id, i))
)
is
) in
List.iter
(fun (block_index, block_opt) ->
flush (remote_id, block_index) block_opt buffers
)
blocks;
in
List.iter
(fun (block_index, block_opt) ->
flush (remote_id, block_index) block_opt buffers
)
blocks
);
Utils.log_with_header
"END: Flushing memory buffers (remote id=%s)\n%!"
remote_id
Expand Down Expand Up @@ -672,9 +670,7 @@ struct
Int64.mul block_size
(Int64.succ (Int64.div total_written_bytes block_size)) in
let write block_index dest_offset src_arr =
let block =
get_block block_index remote_id resource_size buffers in
Utils.with_lock block.Block.buffer.BufferPool.Buffer.mutex
Utils.with_lock buffers.mutex
(fun () ->
let block =
get_block block_index remote_id resource_size buffers in
Expand Down
37 changes: 21 additions & 16 deletions src/drive.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,16 @@ let download_media media_download fileId =
end else
handle_default_exceptions e)

let flush_memory_buffers resource =
let context = Context.get_ctx () in
let config = context |. Context.config_lens in
if config.Config.write_buffers then begin
let memory_buffers = context.Context.memory_buffers in
Buffering.MemoryBuffers.flush_blocks
(resource.CacheData.Resource.remote_id |> Option.get)
memory_buffers
end

let download_resource resource =
let context = Context.get_ctx () in
let cache = context.Context.cache in
Expand Down Expand Up @@ -1417,12 +1427,12 @@ let download_resource resource =
do_download_with_lock ()
in
begin match reloaded_state with
CacheData.Resource.State.Synchronized
| CacheData.Resource.State.Synchronized
| CacheData.Resource.State.ToUpload
| CacheData.Resource.State.Uploading ->
if Sys.file_exists content_path then
if Sys.file_exists content_path then begin
SessionM.return ()
else
end else
do_download_with_lock ()
| CacheData.Resource.State.ToDownload ->
download_if_not_updated ()
Expand Down Expand Up @@ -1529,14 +1539,15 @@ let get_attr path =
begin if CacheData.Resource.is_document resource &&
config.Config.download_docs then
Utils.try_with_m
(with_retry download_resource resource)
(flush_memory_buffers resource;
with_retry download_resource resource)
(function
File_not_found -> SessionM.return ""
| e -> Utils.raise_m e)
else
SessionM.return ""
end >>= fun content_path ->
SessionM.return (resource, content_path)
SessionM.return (resource, content_path)
in

if path = root_directory then
Expand Down Expand Up @@ -1920,8 +1931,10 @@ let read path buf offset file_descr =
else
with_retry (stream_resource offset buf) resource >>= fun () ->
SessionM.return ""
else
else begin
flush_memory_buffers resource;
with_retry download_resource resource
end
in
let build_read_ahead_requests =
Expand Down Expand Up @@ -2085,16 +2098,6 @@ let upload resource =
shrink_cache ();
SessionM.return ()
let flush_memory_buffers resource =
let context = Context.get_ctx () in
let config = context |. Context.config_lens in
if config.Config.write_buffers then begin
let memory_buffers = context.Context.memory_buffers in
Buffering.MemoryBuffers.flush_blocks
(resource.CacheData.Resource.remote_id |> Option.get)
memory_buffers
end
let upload_resource_with_retry resource =
flush_memory_buffers resource;
with_retry (try_with_default upload) resource
Expand Down Expand Up @@ -2449,6 +2452,7 @@ let rename path new_path =
"BEGIN: Replacing content of file %s (remote id=%s) with content \
of file %s (remote id=%s)\n%!"
new_name target_remote_id old_name remote_id;
flush_memory_buffers resource;
with_retry download_resource resource >>= fun content_path ->
let file_patch =
{ File.empty with
Expand Down Expand Up @@ -2628,6 +2632,7 @@ let truncate path size =
let (path_in_cache, trashed) = get_path_in_cache path config in
let truncate_resource =
get_resource path_in_cache trashed >>= fun resource ->
flush_memory_buffers resource;
with_retry download_resource resource >>= fun content_path ->
let remote_id = resource |. CacheData.Resource.remote_id |> Option.get in
Utils.log_with_header "BEGIN: Truncating file (remote id=%s)\n%!" remote_id;
Expand Down

0 comments on commit ed00948

Please sign in to comment.