Skip to content

Commit

Permalink
Merge pull request #15645 from daos-stack/wangdi/google_26_dfuse
Browse files Browse the repository at this point in the history
patches series for dfuse to support simultaneous reads
  • Loading branch information
jolivier23 authored Dec 29, 2024
2 parents 86279b1 + c171530 commit 1144da5
Show file tree
Hide file tree
Showing 16 changed files with 1,004 additions and 233 deletions.
1 change: 0 additions & 1 deletion docs/user/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ Additionally, there are several optional command-line options:
| --container=<label\|uuid\> | container label or uuid to open |
| --sys-name=<name\> | DAOS system name |
| --foreground | run in foreground |
| --singlethreaded | run single threaded |
| --thread-count=<count> | Number of threads to use |
| --multi-user | Run in multi user mode |
| --read-only | Mount in read-only mode |
Expand Down
1 change: 1 addition & 0 deletions src/client/dfuse/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ DFUSE_SRC = ['dfuse_core.c',
'dfuse_main.c',
'dfuse_fuseops.c',
'inval.c',
'file.c',
'dfuse_cont.c',
'dfuse_thread.c',
'dfuse_pool.c']
Expand Down
78 changes: 67 additions & 11 deletions src/client/dfuse/dfuse.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ struct dfuse_info {
char *di_mountpoint;
int32_t di_thread_count;
uint32_t di_eq_count;
bool di_threaded;
bool di_foreground;
bool di_caching;
bool di_multi_user;
Expand Down Expand Up @@ -96,6 +95,9 @@ struct dfuse_eq {
* memory consumption */
#define DFUSE_MAX_PRE_READ (1024 * 1024 * 4)

/* Maximum file-size for pre-read in all cases */
#define DFUSE_MAX_PRE_READ_ONCE (1024 * 1024 * 1)

/* Launch fuse, and do not return until complete */
int
dfuse_launch_fuse(struct dfuse_info *dfuse_info, struct fuse_args *args);
Expand Down Expand Up @@ -137,9 +139,10 @@ struct dfuse_inode_entry;
* when EOF is returned to the kernel. If it's still present on release then it's freed then.
*/
struct dfuse_pre_read {
pthread_mutex_t dra_lock;
d_list_t req_list;
struct dfuse_event *dra_ev;
int dra_rc;
bool complete;
};

/** what is returned as the handle for fuse fuse_file_info on create/open/opendir */
Expand All @@ -149,8 +152,6 @@ struct dfuse_obj_hdl {
/** the DFS object handle. Not created for directories. */
dfs_obj_t *doh_obj;

struct dfuse_pre_read *doh_readahead;

/** the inode entry for the file */
struct dfuse_inode_entry *doh_ie;

Expand All @@ -169,17 +170,24 @@ struct dfuse_obj_hdl {
/* Pointer to the last returned drc entry */
struct dfuse_readdir_c *doh_rd_nextc;

/* Linear read function, if a file is read from start to end then this normally requires
* a final read request at the end of the file that returns zero bytes. Detect this case
* and when the final read is detected then just return without a round trip.
* Store a flag for this being enabled (starts as true, but many I/O patterns will set it
* to false), the expected position of the next read and a boolean for if EOF has been
* detected.
/* Linear read tracking. If a file is opened and read from start to finish then this is
* called a linear read, linear reads however may or may not read EOF at the end of a file,
* as the reader may be checking the file size.
*
* Detect this case and track it at the file handle level, this is then used in two places:
* For read of EOF it means the round-trip can be avoided.
* On release we can use this flag to apply a setting to the directory inode.
*
* This flag starts enabled and many I/O patterns will disable it. We also store the next
* expected read position and if EOF has been reached.
*/

off_t doh_linear_read_pos;
bool doh_linear_read;
bool doh_linear_read_eof;

bool doh_set_linear_read;

/** True if caching is enabled for this file. */
bool doh_caching;

Expand All @@ -197,6 +205,8 @@ struct dfuse_obj_hdl {
bool doh_kreaddir_finished;

bool doh_evict_on_close;
/* the handle is doing readhead for the moment */
bool doh_readahead_inflight;
};

/* Readdir support.
Expand Down Expand Up @@ -401,11 +411,20 @@ struct dfuse_event {
d_iov_t de_iov;
d_sg_list_t de_sgl;
d_list_t de_list;

/* Position in a list of events, this will either be off active->open_reads or
* de->de_read_slaves.
*/
d_list_t de_read_list;
/* List of slave events */
d_list_t de_read_slaves;
struct dfuse_eq *de_eqt;
union {
struct dfuse_obj_hdl *de_oh;
struct dfuse_inode_entry *de_ie;
struct read_chunk_data *de_cd;
};
struct dfuse_info *de_di;
off_t de_req_position; /**< The file position requested by fuse */
union {
size_t de_req_len;
Expand Down Expand Up @@ -1009,10 +1028,32 @@ struct dfuse_inode_entry {
*/
ATOMIC bool ie_linear_read;

struct active_inode *ie_active;

/* Entry on the evict list */
d_list_t ie_evict_entry;
};

struct active_inode {
d_list_t chunks;
d_list_t open_reads;
pthread_spinlock_t lock;
ATOMIC uint64_t read_count;
struct dfuse_pre_read *readahead;
};

/* Increase active count on inode. This takes a reference and allocates ie->active as required */
int
active_ie_init(struct dfuse_inode_entry *ie);

/* Mark a oh as closing and drop the ref on inode active */
void
active_oh_decref(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh);

/* Decrease active count on inode, called on error where there is no oh */
void
active_ie_decref(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie);

/* Flush write-back cache writes to a inode. It does this by waiting for and then releasing an
* exclusive lock on the inode. Writes take a shared lock so this will block until all pending
* writes are complete.
Expand Down Expand Up @@ -1108,6 +1149,13 @@ dfuse_compute_inode(struct dfuse_cont *dfs,
void
dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie);

/* Free any read chunk data for an inode.
*
* Returns true if feature was used.
*/
bool
read_chunk_close(struct dfuse_inode_entry *ie);

/* Metadata caching functions. */

/* Mark the cache as up-to-date from now */
Expand Down Expand Up @@ -1171,7 +1219,15 @@ bool
dfuse_dcache_get_valid(struct dfuse_inode_entry *ie, double max_age);

void
dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh);
dfuse_pre_read(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh, struct dfuse_event *ev);

int
dfuse_pre_read_init(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie,
struct dfuse_event **evp);

void
dfuse_pre_read_abort(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh,
struct dfuse_event *ev, int rc);

int
check_for_uns_ep(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie, char *attr,
Expand Down
3 changes: 3 additions & 0 deletions src/client/dfuse/dfuse_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,7 @@ dfuse_ie_close(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie)
atomic_load_relaxed(&ie->ie_il_count));
D_ASSERTF(atomic_load_relaxed(&ie->ie_open_count) == 0, "open_count is %d",
atomic_load_relaxed(&ie->ie_open_count));
D_ASSERT(!ie->ie_active);

if (ie->ie_obj) {
rc = dfs_release(ie->ie_obj);
Expand Down Expand Up @@ -1317,6 +1318,8 @@ dfuse_read_event_size(void *arg, size_t size)
ev->de_sgl.sg_nr = 1;
}

D_INIT_LIST_HEAD(&ev->de_read_slaves);

rc = daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL);
if (rc != -DER_SUCCESS) {
return false;
Expand Down
9 changes: 7 additions & 2 deletions src/client/dfuse/dfuse_fuseops.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -88,7 +88,12 @@ dfuse_fuse_init(void *arg, struct fuse_conn_info *conn)
DFUSE_TRA_INFO(dfuse_info, "kernel readdir cache support compiled in");

conn->want |= FUSE_CAP_READDIRPLUS;
conn->want |= FUSE_CAP_READDIRPLUS_AUTO;
/* Temporarily force readdir plus for all cases now, which can
* help to save some lookup RPC for some cases. Though this can be
* removed once we use object enumeration to replace the normal key
* enumeration for readdir. XXX
*/
conn->want &= ~FUSE_CAP_READDIRPLUS_AUTO;

#ifdef FUSE_CAP_CACHE_SYMLINKS
conn->want |= FUSE_CAP_CACHE_SYMLINKS;
Expand Down
26 changes: 8 additions & 18 deletions src/client/dfuse/dfuse_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ dfuse_bg(struct dfuse_info *dfuse_info)
*
* Should be called from the post_start plugin callback and creates
* a filesystem.
* Returns a DAOS error code.
* Returns true on success, false on failure.
*/
int
Expand Down Expand Up @@ -204,18 +205,17 @@ dfuse_launch_fuse(struct dfuse_info *dfuse_info, struct fuse_args *args)
DFUSE_TRA_ERROR(dfuse_info, "Error sending signal to fg: "DF_RC, DP_RC(rc));

/* Blocking */
if (dfuse_info->di_threaded)
rc = dfuse_loop(dfuse_info);
else
rc = fuse_session_loop(dfuse_info->di_session);
if (rc != 0)
rc = dfuse_loop(dfuse_info);
if (rc != 0) {
DHS_ERROR(dfuse_info, rc, "Fuse loop exited");
rc = daos_errno2der(rc);
}

umount:

fuse_session_unmount(dfuse_info->di_session);

return daos_errno2der(rc);
return rc;
}

#define DF_POOL_PREFIX "pool="
Expand Down Expand Up @@ -279,7 +279,6 @@ show_help(char *name)
" --path=<path> Path to load UNS pool/container data\n"
" --sys-name=STR DAOS system name context for servers\n"
"\n"
" -S --singlethread Single threaded (deprecated)\n"
" -t --thread-count=count Total number of threads to use\n"
" -e --eq-count=count Number of event queues to use\n"
" -f --foreground Run in foreground\n"
Expand Down Expand Up @@ -423,7 +422,6 @@ main(int argc, char **argv)
{"pool", required_argument, 0, 'p'},
{"container", required_argument, 0, 'c'},
{"sys-name", required_argument, 0, 'G'},
{"singlethread", no_argument, 0, 'S'},
{"thread-count", required_argument, 0, 't'},
{"eq-count", required_argument, 0, 'e'},
{"foreground", no_argument, 0, 'f'},
Expand All @@ -447,13 +445,12 @@ main(int argc, char **argv)
if (dfuse_info == NULL)
D_GOTO(out_debug, rc = -DER_NOMEM);

dfuse_info->di_threaded = true;
dfuse_info->di_caching = true;
dfuse_info->di_wb_cache = true;
dfuse_info->di_eq_count = 1;

while (1) {
c = getopt_long(argc, argv, "Mm:St:o:fhe:v", long_options, NULL);
c = getopt_long(argc, argv, "Mm:t:o:fhe:v", long_options, NULL);

if (c == -1)
break;
Expand Down Expand Up @@ -491,13 +488,6 @@ main(int argc, char **argv)
case 'P':
path = optarg;
break;
case 'S':
/* Set it to be single threaded, but allow an extra one
* for the event queue processing
*/
dfuse_info->di_threaded = false;
dfuse_info->di_thread_count = 2;
break;
case 'e':
dfuse_info->di_eq_count = atoi(optarg);
break;
Expand Down Expand Up @@ -564,7 +554,7 @@ main(int argc, char **argv)
* check CPU binding. If bound to a number of cores then launch that number of threads,
* if not bound them limit to 16.
*/
if (dfuse_info->di_threaded && !have_thread_count) {
if (!have_thread_count) {
struct hwloc_topology *hwt;
hwloc_const_cpuset_t hw;
int total;
Expand Down
Loading

0 comments on commit 1144da5

Please sign in to comment.