Skip to content

Commit

Permalink
Merge pull request #8846 from dolthub/aaron/nbs-add-table-file-check-…
Browse files Browse the repository at this point in the history
…refs

[no-release-notes] go: nbs,remotesrv: Cleanup and comments for AddTableFilesToManifest ref checks PR.
  • Loading branch information
reltuk authored Feb 11, 2025
2 parents ac6d4d4 + 17d5e24 commit 846b67e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
5 changes: 5 additions & 0 deletions go/libraries/doltcore/remotesrv/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,11 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
return &remotesapi.AddTableFilesResponse{Success: true}, nil
}

// Returns a |chunks.GetAddrsCurry| for the nbf (NomsBinFormat)
// corresponding to |version|.
//
// Used to implement chunk reference sanity checks when adding table files that have
// been uploaded by clients to the stores managed by the gRPC server.
func (rs *RemoteChunkStore) getAddrs(version string) chunks.GetAddrsCurry {
fmt, err := types.GetFormatForVersionString(version)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions go/store/nbs/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ func trueUpBackingManifest(ctx context.Context, root hash.Hash, backing *journal
if err != nil {
return manifestContents{}, err
} else if !ok {
// If there is no backing manifest yet, we simply
// return without any manifest contents. We can open a
// newly created (cloned) journal file before the
// manifest corresponding to its existence has been
// created. |*ChunkJournal.ParseIfExists| forwards
// to the backing store in the case that the loaded
// manifest is currently empty, so eventually the
// manifest will be created.
return manifestContents{}, nil
}

Expand Down
16 changes: 13 additions & 3 deletions go/store/nbs/table_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ func (ts tableSet) openForAdd(ctx context.Context, files map[hash.Hash]uint32, s
source.close()
}
}
// First add clones of all sources that are already present in
// ts.novel or ts.upstream.
for h := range files {
if s, ok := ts.novel[h]; ok {
cloned, err := s.clone()
Expand All @@ -457,16 +459,24 @@ func (ts tableSet) openForAdd(ctx context.Context, files map[hash.Hash]uint32, s
ret[h] = cloned
}
}
// Concurrently open all files that are not already
// in |ret|.
eg, ctx := errgroup.WithContext(ctx)
var mu sync.Mutex
for h, c := range files {
for fileId, chunkCount := range files {
mu.Lock()
_, ok := ret[fileId]
mu.Unlock()
if ok {
continue
}
eg.Go(func() error {
cs, err := ts.p.Open(ctx, h, c, stats)
cs, err := ts.p.Open(ctx, fileId, chunkCount, stats)
if err != nil {
return err
}
mu.Lock()
ret[h] = cs
ret[fileId] = cs
mu.Unlock()
return nil
})
Expand Down

0 comments on commit 846b67e

Please sign in to comment.