Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging for ETags and FUSE operation flags #618

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,23 @@ pub type GetBodyPart = (u64, Box<[u8]>);
///
/// New ETags can be created with the [`FromStr`] implementation.
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct ETag {
etag: String,
}
pub struct ETag(String);

impl ETag {
/// Get the ETag as a string
pub fn as_str(&self) -> &str {
&self.etag
&self.0
}

/// Unpack the [String] contained by the [ETag] wrapper
pub fn into_inner(self) -> String {
self.etag
self.0
}

/// Creating default etag for tests
#[doc(hidden)]
pub fn for_tests() -> Self {
Self {
etag: "test_etag".to_string(),
}
Self("test_etag".to_string())
}

/// Creating unique etag from bytes
Expand All @@ -52,16 +48,15 @@ impl ETag {

let hash = hasher.finalize();
let result = format!("{:x}", hash);
Self { etag: result }
Self(result)
}
}

impl FromStr for ETag {
type Err = ParseError;
fn from_str(value: &str) -> Result<Self, Self::Err> {
Ok(ETag {
etag: value.to_string(),
})
let etag = value.to_string();
Ok(ETag(etag))
}
}

Expand Down
5 changes: 3 additions & 2 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ where
}

pub async fn open(&self, ino: InodeNo, flags: i32, pid: u32) -> Result<Opened, Error> {
trace!("fs:open with ino {:?} flags {:?} pid {:?}", ino, flags, pid);
trace!("fs:open with ino {:?} flags {:#b} pid {:?}", ino, flags, pid);

let force_revalidate = !self.config.cache_config.serve_lookup_from_cache;
let lookup = self.superblock.getattr(&self.client, ino, force_revalidate).await?;
Expand Down Expand Up @@ -594,6 +594,7 @@ where
object_size: lookup.stat.size as u64,
typ: handle_type,
};
debug!(fh, ino, "new file handle created");
self.file_handles.write().await.insert(fh, Arc::new(handle));

Ok(Opened { fh, flags: 0 })
Expand Down Expand Up @@ -742,7 +743,7 @@ where
}

pub async fn opendir(&self, parent: InodeNo, _flags: i32) -> Result<Opened, Error> {
trace!("fs:opendir with parent {:?} flags {:?}", parent, _flags);
trace!("fs:opendir with parent {:?} flags {:#b}", parent, _flags);

let inode_handle = self.superblock.readdir(&self.client, parent, 1000).await?;

Expand Down
14 changes: 11 additions & 3 deletions mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ impl SuperblockInner {
// If we reach here, the ListObjects didn't find a shadowing directory, so we know we either
// have a valid file, or both requests failed to find the object so the file must not exist remotely
if let Some(mut stat) = file_state {
trace!(parent = ?parent_ino, ?name, "found a regular file");
trace!(parent = ?parent_ino, ?name, etag =? stat.etag, "found a regular file in S3");
// Update the validity of the stat in case the racing ListObjects took a long time
stat.update_validity(self.cache_config.file_ttl);
Ok(Some(RemoteLookup {
Expand Down Expand Up @@ -1174,7 +1174,11 @@ impl Inode {
let mut state = self.inner.sync.write().unwrap();
let lookup_count = &mut state.lookup_count;
*lookup_count += 1;
trace!(new_lookup_count = lookup_count, "incremented lookup count");
trace!(
ino = self.ino(),
new_lookup_count = lookup_count,
"incremented lookup count",
);
*lookup_count
}

Expand All @@ -1186,7 +1190,11 @@ impl Inode {
let lookup_count = &mut state.lookup_count;
debug_assert!(n <= *lookup_count, "lookup count cannot go negative");
*lookup_count = lookup_count.saturating_sub(n);
trace!(new_lookup_count = lookup_count, "decremented lookup count");
trace!(
ino = self.ino(),
new_lookup_count = lookup_count,
"decremented lookup count",
);
*lookup_count
}

Expand Down
26 changes: 16 additions & 10 deletions mountpoint-s3/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ where
}

async fn get_from_cache(self, range: RequestRange) {
let key = self.cache_key.s3_key.as_str();
let cache_key = &self.cache_key;
let block_size = self.cache.block_size();
let block_range = self.block_indices_for_byte_range(&range);

Expand All @@ -120,17 +120,23 @@ where
// already likely negligible.
let mut block_offset = block_range.start * block_size;
for block_index in block_range.clone() {
match self.cache.get_block(&self.cache_key, block_index, block_offset) {
match self.cache.get_block(cache_key, block_index, block_offset) {
Ok(Some(block)) => {
trace!(?key, ?range, block_index, "cache hit");
trace!(?cache_key, ?range, block_index, "cache hit");
let part = self.make_part(block, block_index, block_offset, &range);
metrics::counter!("cache.total_bytes", part.len() as u64, "type" => "read");
self.part_queue_producer.push(Ok(part));
block_offset += block_size;
continue;
}
Ok(None) => trace!(?key, ?range, block_index, "cache miss - no data for block"),
Err(error) => error!(?key, ?range, block_index, ?error, "error reading block from cache"),
Ok(None) => trace!(?cache_key, block_index, ?range, "cache miss - no data for block"),
Err(error) => error!(
?cache_key,
block_index,
?range,
?error,
"error reading block from cache",
),
}
// If a block is uncached or reading it fails, fallback to S3 for the rest of the stream.
return self
Expand Down Expand Up @@ -273,24 +279,24 @@ where
"invalid block offset"
);

let key = &self.cache_key.s3_key;
let cache_key = &self.cache_key;
let block_size = block.len();
let part_range = range
.trim_start(block_offset)
.trim_end(block_offset + block_size as u64);
trace!(
key,
?part_range,
?cache_key,
block_index,
?part_range,
block_offset,
block_size,
"creating part from block"
"creating part from block data",
);

let trim_start = (part_range.start().saturating_sub(block_offset)) as usize;
let trim_end = (part_range.end().saturating_sub(block_offset)) as usize;
let bytes = block.slice(trim_start..trim_end);
Part::new(key, part_range.start(), bytes)
Part::new(cache_key.s3_key.as_str(), part_range.start(), bytes)
}

fn block_indices_for_byte_range(&self, range: &RequestRange) -> Range<BlockIndex> {
Expand Down
Loading