From a5d679426802f3f6f8b083325b4d439e5af6aea6 Mon Sep 17 00:00:00 2001 From: zouxu Date: Wed, 29 May 2024 14:37:34 +0800 Subject: [PATCH] disk method support error --- addr/addr.go | 2 +- buf/buf.go | 13 +- buf/buf_test.go | 2 +- common/common.go | 2 +- disk/disk.go | 36 ++++++ disk/disk_impl.go | 147 ++++++++++++++++++++++ go.mod | 2 +- go.sum | 9 +- jrnl/jrnl.go | 23 ++-- jrnl/jrnl_test.go | 81 ++++++++---- jrnl_replication/jrnl_replication.go | 19 +-- jrnl_replication/jrnl_replication_test.go | 26 ++-- obj/obj.go | 43 ++++--- txn/txn.go | 41 +++--- txn/txn_test.go | 25 ++-- wal/00walconst.go | 31 ++--- wal/0circular.go | 21 +++- wal/0waldefs.go | 2 +- wal/installer.go | 2 +- wal/wal.go | 38 +++--- wal/wal_test.go | 24 +++- 21 files changed, 434 insertions(+), 155 deletions(-) create mode 100644 disk/disk.go create mode 100644 disk/disk_impl.go diff --git a/addr/addr.go b/addr/addr.go index 502dd36..000bfef 100644 --- a/addr/addr.go +++ b/addr/addr.go @@ -1,7 +1,7 @@ package addr import ( - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/common" ) diff --git a/buf/buf.go b/buf/buf.go index 370217d..8b68111 100644 --- a/buf/buf.go +++ b/buf/buf.go @@ -2,7 +2,7 @@ package buf import ( - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "github.com/tchajed/marshal" "github.com/mit-pdos/go-journal/addr" @@ -93,14 +93,17 @@ func (buf *Buf) SetDirty() { buf.dirty = true } -func (buf *Buf) WriteDirect(d disk.Disk) { +func (buf *Buf) WriteDirect(d disk.Disk) error { buf.SetDirty() if buf.Sz == disk.BlockSize { - d.Write(uint64(buf.Addr.Blkno), buf.Data) + return d.Write(uint64(buf.Addr.Blkno), buf.Data) } else { - blk := d.Read(uint64(buf.Addr.Blkno)) + blk, err := d.Read(uint64(buf.Addr.Blkno)) + if err != nil { + return err + } buf.Install(blk) - d.Write(uint64(buf.Addr.Blkno), blk) + return d.Write(uint64(buf.Addr.Blkno), blk) } } diff --git a/buf/buf_test.go b/buf/buf_test.go index f14dbed..94e48c9 100644 --- a/buf/buf_test.go +++ b/buf/buf_test.go @@ -1,7 +1,7 @@ package buf import ( - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "testing" diff --git a/common/common.go b/common/common.go index 98b58df..5ad9660 100644 --- a/common/common.go +++ b/common/common.go @@ -1,7 +1,7 @@ package common import ( - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" ) const ( diff --git a/disk/disk.go b/disk/disk.go new file mode 100644 index 0000000..5445f5b --- /dev/null +++ b/disk/disk.go @@ -0,0 +1,36 @@ +package disk + +// Block is a 4096-byte buffer +type Block = []byte + +const BlockSize uint64 = 4096 //128 // 32 // 4096 + +// Disk provides access to a logical block-based disk +type Disk interface { + // Read reads a disk block by address + // + // Expects a < Size(). + Read(a uint64) (Block, error) + + // ReadTo reads the disk block at a and stores the result in b + // + // Expects a < Size(). + ReadTo(a uint64, b Block) error + + // Write updates a disk block by address + // + // Expects a < Size(). + Write(a uint64, v Block) error + + // Size reports how big the disk is, in blocks + Size() (uint64, error) + + // Barrier ensures data is persisted. + // + // When it returns, all outstanding writes are guaranteed to be durably on + // disk + Barrier() error + + // Close releases any resources used by the disk and makes it unusable. + Close() error +} diff --git a/disk/disk_impl.go b/disk/disk_impl.go new file mode 100644 index 0000000..ec75314 --- /dev/null +++ b/disk/disk_impl.go @@ -0,0 +1,147 @@ +package disk + +import ( + "fmt" + "sync" + + "golang.org/x/sys/unix" +) + +var _ Disk = (*fileDisk)(nil) + +type fileDisk struct { + fd int + numBlocks uint64 +} + +func NewFileDisk(path string, numBlocks uint64) (fileDisk, error) { + fd, err := unix.Open(path, unix.O_RDWR|unix.O_CREAT, 0666) + if err != nil { + return fileDisk{}, err + } + var stat unix.Stat_t + err = unix.Fstat(fd, &stat) + if err != nil { + return fileDisk{}, err + } + if (stat.Mode&unix.S_IFREG) != 0 && uint64(stat.Size) != numBlocks { + err = unix.Ftruncate(fd, int64(numBlocks*BlockSize)) + if err != nil { + return fileDisk{}, err + } + } + return fileDisk{fd, numBlocks}, nil +} + +// var _ Disk = FileDisk{} + +func (d fileDisk) ReadTo(a uint64, buf Block) error { + if uint64(len(buf)) != BlockSize { + panic("buffer is not block-sized") + } + if a >= d.numBlocks { + panic(fmt.Errorf("out-of-bounds read at %v", a)) + } + _, err := unix.Pread(d.fd, buf, int64(a*BlockSize)) + if err != nil { + panic("read failed: " + err.Error()) + } + fmt.Printf("read: %v-%v\n", a, buf) + return nil +} + +func (d fileDisk) Read(a uint64) (Block, error) { + buf := make([]byte, BlockSize) + err := d.ReadTo(a, buf) + return buf, err +} + +func (d fileDisk) Write(a uint64, v Block) error { + if uint64(len(v)) != BlockSize { + panic(fmt.Errorf("v is not block sized (%d bytes)", len(v))) + } + if a >= d.numBlocks { + panic(fmt.Errorf("out-of-bounds write at %v", a)) + } + _, err := unix.Pwrite(d.fd, v, int64(a*BlockSize)) + if err != nil { + panic("write failed: " + err.Error()) + } + fmt.Printf("write: %v-%v\n", a, v) + return nil +} + +func (d fileDisk) Size() (uint64, error) { + return d.numBlocks, nil +} + +func (d fileDisk) Barrier() error { + // NOTE: on macOS, this flushes to the drive but doesn't actually issue a + // disk barrier; see https://golang.org/src/internal/poll/fd_fsync_darwin.go + // for more details. The correct replacement is to issue a fcntl syscall with + // cmd F_FULLFSYNC. + err := unix.Fsync(d.fd) + if err != nil { + panic("file sync failed: " + err.Error()) + } + fmt.Printf("barrier\n") + return nil +} + +func (d fileDisk) Close() error { + err := unix.Close(d.fd) + if err != nil { + panic(err) + } + return nil +} + +var _ Disk = (*memDisk)(nil) + +type memDisk struct { + l *sync.RWMutex + blocks [][BlockSize]byte +} + +func NewMemDisk(numBlocks uint64) memDisk { + blocks := make([][BlockSize]byte, numBlocks) + return memDisk{l: new(sync.RWMutex), blocks: blocks} +} + +func (d memDisk) ReadTo(a uint64, buf Block) error { + d.l.RLock() + defer d.l.RUnlock() + if a >= uint64(len(d.blocks)) { + panic(fmt.Errorf("out-of-bounds read at %v", a)) + } + copy(buf, d.blocks[a][:]) + return nil +} + +func (d memDisk) Read(a uint64) (Block, error) { + buf := make(Block, BlockSize) + d.ReadTo(a, buf) + return buf, nil +} + +func (d memDisk) Write(a uint64, v Block) error { + if uint64(len(v)) != BlockSize { + panic(fmt.Errorf("v is not block-sized (%d bytes)", len(v))) + } + d.l.Lock() + defer d.l.Unlock() + if a >= uint64(len(d.blocks)) { + panic(fmt.Errorf("out-of-bounds write at %v", a)) + } + copy(d.blocks[a][:], v) + return nil +} + +func (d memDisk) Size() (uint64, error) { + // this never changes so we assume it's safe to run lock-free + return uint64(len(d.blocks)), nil +} + +func (d memDisk) Barrier() error { return nil } + +func (d memDisk) Close() error { return nil } diff --git a/go.mod b/go.mod index 6dd6238..b6d1ee8 100644 --- a/go.mod +++ b/go.mod @@ -6,5 +6,5 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tchajed/goose v0.5.3 github.com/tchajed/marshal v0.4.3 - golang.org/x/sys v0.16.0 // indirect + golang.org/x/sys v0.6.0 ) diff --git a/go.sum b/go.sum index 210cc9b..0f57f94 100644 --- a/go.sum +++ b/go.sum @@ -38,7 +38,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -48,7 +47,6 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/tchajed/goose v0.0.0-20191114201541-ebbf1d75c8ca/go.mod h1:2c33VcNcIHG8vQhprFmBlZxpC62+TfmnGjB+jVaKhXo= github.com/tchajed/goose v0.0.0-20200128225509-92a5cfe01fc4/go.mod h1:rhep/Jc/mYPoMIYG8dPtnc71UHAPHNYRx1qvpB245Ss= github.com/tchajed/goose v0.2.0/go.mod h1:Y2zBxxd3wa0lrYRDyebxZgbgV4LLsPUtsBiLe1wnS9c= -github.com/tchajed/goose v0.3.1 h1:pofh2XDZR5lt+WYgKBZbHwyD2QPgea/37gRbLa2AecI= github.com/tchajed/goose v0.3.1/go.mod h1:s6D4ibH0SaPtDcp0krx05XkZLlqipgRyRH8X6JP+hcc= github.com/tchajed/goose v0.4.4/go.mod h1:w5NCIqICRsj0OJfTuft7HO2H51u629Qn0d+wxZsu6Eo= github.com/tchajed/goose v0.5.3 h1:+Ag8wwWfsQZfsTSW953HfDvJmzQ6YopLsfnTNzcd/ZE= @@ -56,7 +54,6 @@ github.com/tchajed/goose v0.5.3/go.mod h1:ISQLiMmllwxTyKhi5wn/PKE3CeseL6Jnvv3CCW github.com/tchajed/mailboat v0.0.0-20191026015926-338a5b81ac1d/go.mod h1:dmgNTEH0kreeKMWd+ntzsrp7ie2LWHGMe7VxP6d1Aew= github.com/tchajed/mailboat v0.2.0/go.mod h1:aKa/T1YCMVZFM2xbXnMNyp9r4k0pPni4+sJ8GoY51Hw= github.com/tchajed/marshal v0.0.0-20200707011626-0d2aa09818a9/go.mod h1:TPo3bTYJkH87/4rXlxe0bpVWLnN+b5kjJnoXHLBfdaA= -github.com/tchajed/marshal v0.2.0 h1:6syf2SG3//AE891aHhj6BS+fv3OCMxFM322x05ot3QE= github.com/tchajed/marshal v0.2.0/go.mod h1:jq2B5uP+QvY51sZB57Td/OsareaZEzm3HVuvVKhC22I= github.com/tchajed/marshal v0.4.3 h1:dnLvl3JDzWMAj2xyCSWxiOr5PcZlu++KdPdl1/Ts51g= github.com/tchajed/marshal v0.4.3/go.mod h1:tdiYzC42jUHdvBWrJYa7LytEbHEV2zA+nI3TLLb/nvU= @@ -99,8 +96,6 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k= -golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -108,9 +103,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -139,7 +133,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/jrnl/jrnl.go b/jrnl/jrnl.go index 41a795e..70ceb4f 100644 --- a/jrnl/jrnl.go +++ b/jrnl/jrnl.go @@ -35,16 +35,18 @@ package jrnl import ( "github.com/mit-pdos/go-journal/addr" "github.com/mit-pdos/go-journal/buf" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/obj" "github.com/mit-pdos/go-journal/util" + "github.com/mit-pdos/go-journal/wal" ) // LogBlocks is the maximum number of blocks that can be written in one // operation -const LogBlocks uint64 = 511 +const LogBlocks uint64 = wal.LOGSZ //3 // LogBytes is the maximum size of an operation, in bytes -const LogBytes uint64 = 4096 * 511 +const LogBytes uint64 = disk.BlockSize * LogBlocks // Op is an in-progress journal operation. // @@ -66,14 +68,17 @@ func Begin(log *obj.Log) *Op { return trans } -func (op *Op) ReadBuf(addr addr.Addr, sz uint64) *buf.Buf { +func (op *Op) ReadBuf(addr addr.Addr, sz uint64) (*buf.Buf, error) { b := op.bufs.Lookup(addr) if b == nil { - buf := op.log.Load(addr, sz) + buf, err := op.log.Load(addr, sz) + if err != nil { + return nil, err + } op.bufs.Insert(buf) - return op.bufs.Lookup(addr) + return op.bufs.Lookup(addr), nil } - return b + return b, nil } // OverWrite writes an object to addr @@ -111,8 +116,8 @@ func (op *Op) NDirty() uint64 { // // wait=false is an asynchronous commit, which can be made durable later with // Flush. -func (op *Op) CommitWait(wait bool) bool { +func (op *Op) CommitWait(wait bool) error { util.DPrintf(3, "Commit %p w %v\n", op, wait) - ok := op.log.CommitWait(op.bufs.DirtyBufs(), wait) - return ok + err := op.log.CommitWait(op.bufs.DirtyBufs(), wait) + return err } diff --git a/jrnl/jrnl_test.go b/jrnl/jrnl_test.go index 047a814..10446a0 100644 --- a/jrnl/jrnl_test.go +++ b/jrnl/jrnl_test.go @@ -1,17 +1,18 @@ package jrnl_test import ( + "fmt" "math/rand" "sync" "testing" "github.com/mit-pdos/go-journal/addr" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/jrnl" "github.com/mit-pdos/go-journal/obj" "github.com/mit-pdos/go-journal/util" "github.com/mit-pdos/go-journal/wal" "github.com/stretchr/testify/assert" - "github.com/tchajed/goose/machine/disk" ) func TestSizeConstants(t *testing.T) { @@ -25,55 +26,86 @@ func data(sz int) []byte { return d } -const inodeSz uint64 = 8 * 128 +const inodeSz uint64 = 8 * 32 func inodeAddr(i uint64) addr.Addr { - return addr.MkAddr(513+i/32, (i%32)*inodeSz) + inodeCountBlk := disk.BlockSize / (inodeSz / 8) + return addr.MkAddr(wal.LOGDISKBLOCKS+i/inodeCountBlk, (i%inodeCountBlk)*inodeSz) } func TestJrnlWriteRead(t *testing.T) { - d := disk.NewMemDisk(10000) - log := obj.MkLog(d) + // util.Debug = 10 - op := jrnl.Begin(log) - bs0 := data(128) - bs1 := data(128) - op.OverWrite(inodeAddr(0), inodeSz, bs0) - op.OverWrite(inodeAddr(1), inodeSz, bs1) - op.CommitWait(true) + var d disk.Disk = disk.NewMemDisk(wal.LOGDISKBLOCKS + 15) + + fmt.Printf("log size: %v\n", jrnl.LogBlocks) + + // pwd, _ := os.Getwd() + // path := pwd + "/disk.log" + // os.Remove(path) + // d, err := disk.NewFileDisk(path, wal.LOGDISKBLOCKS+15) + // assert.Nil(t, err) + + log, err := obj.MkLog(d) + assert.Nil(t, err) + + if true { + op := jrnl.Begin(log) + for i := 0; i < 3; i++ { + op.OverWrite(inodeAddr(uint64(i)), inodeSz, data(int(inodeSz/8))) + } + op.CommitWait(true) + } + + if true { + op := jrnl.Begin(log) + bs0 := data(int(inodeSz / 8)) + bs1 := data(int(inodeSz / 8)) + op.OverWrite(inodeAddr(0), inodeSz, bs0) + op.OverWrite(inodeAddr(1), inodeSz, bs1) + op.CommitWait(true) + + op = jrnl.Begin(log) + buf, err := op.ReadBuf(inodeAddr(0), inodeSz) + assert.Nil(t, err) + assert.Equal(t, bs0, buf.Data) + buf, err = op.ReadBuf(inodeAddr(1), inodeSz) + + assert.Nil(t, err) + assert.Equal(t, bs1, buf.Data) + } - op = jrnl.Begin(log) - buf := op.ReadBuf(inodeAddr(0), inodeSz) - assert.Equal(t, bs0, buf.Data) - buf = op.ReadBuf(inodeAddr(1), inodeSz) - assert.Equal(t, bs1, buf.Data) } func assertObj(t *testing.T, expected []byte, op *jrnl.Op, a addr.Addr, msgAndArgs ...interface{}) { t.Helper() sz := 8 * uint64(len(expected)) - buf := op.ReadBuf(a, sz) + buf, err := op.ReadBuf(a, sz) + assert.Nil(t, err) assert.Equal(t, expected, buf.Data, msgAndArgs...) } func TestJrnlReadSetDirty(t *testing.T) { d := disk.NewMemDisk(10000) - log := obj.MkLog(d) + log, err := obj.MkLog(d) + assert.Nil(t, err) op := jrnl.Begin(log) // initialize with non-zero data - bs0 := data(128) - bs1 := data(128) + bs0 := data(int(inodeSz / 8)) + bs1 := data(int(inodeSz / 8)) op.OverWrite(inodeAddr(0), inodeSz, util.CloneByteSlice(bs0)) op.OverWrite(inodeAddr(1), inodeSz, util.CloneByteSlice(bs1)) op.CommitWait(true) log.Shutdown() - log = obj.MkLog(d) + log, err = obj.MkLog(d) + assert.Nil(t, err) op = jrnl.Begin(log) // modify just inode 1 through ReadBuf - buf := op.ReadBuf(inodeAddr(1), inodeSz) + buf, err := op.ReadBuf(inodeAddr(1), inodeSz) + assert.Nil(t, err) buf.Data[0], buf.Data[1] = 0, 0 buf.SetDirty() op.CommitWait(true) @@ -85,7 +117,8 @@ func TestJrnlReadSetDirty(t *testing.T) { func testJrnlConcurrentOperations(t *testing.T, wait bool) { d := disk.NewMemDisk(10000) - log := obj.MkLog(d) + log, err := obj.MkLog(d) + assert.Nil(t, err) // 2048 = 64*32, so 64 blocks worth of "inodes" const numInodes = 2048 @@ -97,7 +130,7 @@ func testJrnlConcurrentOperations(t *testing.T, wait bool) { i := i go func() { op := jrnl.Begin(log) - bs := data(128) + bs := data(int(inodeSz / 8)) op.OverWrite(inodeAddr(i), inodeSz, bs) op.CommitWait(wait) inodes[i] = bs diff --git a/jrnl_replication/jrnl_replication.go b/jrnl_replication/jrnl_replication.go index 39dbdff..698a5fa 100644 --- a/jrnl_replication/jrnl_replication.go +++ b/jrnl_replication/jrnl_replication.go @@ -3,7 +3,7 @@ package replicated_block import ( "sync" - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/addr" "github.com/mit-pdos/go-journal/common" @@ -31,23 +31,26 @@ func Open(txn *obj.Log, a common.Bnum) *RepBlock { // can fail in principle if CommitWait fails, // but that's really impossible since it's an empty transaction -func (rb *RepBlock) Read() (disk.Block, bool) { +func (rb *RepBlock) Read() (disk.Block, error) { rb.m.Lock() tx := jrnl.Begin(rb.txn) - buf := tx.ReadBuf(rb.a0, 8*disk.BlockSize) + buf, err := tx.ReadBuf(rb.a0, 8*disk.BlockSize) + if err != nil { + return nil, err + } b := util.CloneByteSlice(buf.Data) // now we can reassemble the transaction - ok := tx.CommitWait(true) + err = tx.CommitWait(true) rb.m.Unlock() - return b, ok + return b, err } -func (rb *RepBlock) Write(b disk.Block) bool { +func (rb *RepBlock) Write(b disk.Block) error { rb.m.Lock() tx := jrnl.Begin(rb.txn) tx.OverWrite(rb.a0, 8*disk.BlockSize, b) tx.OverWrite(rb.a1, 8*disk.BlockSize, b) - ok := tx.CommitWait(true) + err := tx.CommitWait(true) rb.m.Unlock() - return ok + return err } diff --git a/jrnl_replication/jrnl_replication_test.go b/jrnl_replication/jrnl_replication_test.go index 01b2c12..50210dc 100644 --- a/jrnl_replication/jrnl_replication_test.go +++ b/jrnl_replication/jrnl_replication_test.go @@ -3,8 +3,8 @@ package replicated_block import ( "testing" + "github.com/mit-pdos/go-journal/disk" "github.com/stretchr/testify/assert" - "github.com/tchajed/goose/machine/disk" "github.com/mit-pdos/go-journal/obj" ) @@ -17,13 +17,14 @@ func mkBlock(b0 byte) disk.Block { func TestRepBlock(t *testing.T) { d := disk.NewMemDisk(1000) - tx := obj.MkLog(d) + tx, err := obj.MkLog(d) + assert.Nil(t, err) rb := Open(tx, 514) - ok := rb.Write(mkBlock(1)) - assert.True(t, ok, "write txn should succeed") + err = rb.Write(mkBlock(1)) + assert.Nil(t, err, "write txn should succeed") - b, ok := rb.Read() - assert.True(t, ok, "read-only txn should succeed") + b, err := rb.Read() + assert.Nil(t, err, "read-only txn should succeed") assert.Equal(t, byte(1), b[0]) tx.Shutdown() @@ -31,15 +32,18 @@ func TestRepBlock(t *testing.T) { func TestRepBlockRecovery(t *testing.T) { d := disk.NewMemDisk(1000) - tx := obj.MkLog(d) + tx, err := obj.MkLog(d) + assert.Nil(t, err) rb := Open(tx, 514) - ok := rb.Write(mkBlock(1)) - assert.True(t, ok, "write txn should succeed") + err = rb.Write(mkBlock(1)) + assert.Nil(t, err, "write txn should succeed") tx.Shutdown() - tx2 := obj.MkLog(d) + tx2, err := obj.MkLog(d) + assert.Nil(t, err) rb2 := Open(tx2, 514) - b, _ := rb2.Read() + b, err := rb2.Read() + assert.Nil(t, err) assert.Equal(t, byte(1), b[0], "rep block should be crash safe") tx2.Shutdown() } diff --git a/obj/obj.go b/obj/obj.go index 9b06d18..de5946f 100644 --- a/obj/obj.go +++ b/obj/obj.go @@ -4,7 +4,7 @@ package obj import ( - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/addr" "github.com/mit-pdos/go-journal/buf" @@ -26,20 +26,27 @@ type Log struct { // MkLog recovers the object logging system // (or initializes from an all-zero disk). -func MkLog(d disk.Disk) *Log { +func MkLog(d disk.Disk) (*Log, error) { + mklog, err := wal.MkLog(d) + if err != nil { + return nil, err + } log := &Log{ mu: new(sync.Mutex), - log: wal.MkLog(d), + log: mklog, pos: wal.LogPosition(0), } - return log + return log, nil } // Read a disk object into buf -func (l *Log) Load(addr addr.Addr, sz uint64) *buf.Buf { - blk := l.log.Read(addr.Blkno) +func (l *Log) Load(addr addr.Addr, sz uint64) (*buf.Buf, error) { + blk, err := l.log.Read(addr.Blkno) + if err != nil { + return nil, err + } b := buf.MkBufLoad(addr, sz, blk) - return b + return b, nil } // Installs bufs into their blocks and returns the blocks. @@ -57,7 +64,7 @@ func (l *Log) installBufsMap(bufs []*buf.Buf) map[common.Bnum][]byte { if ok { blk = mapblk } else { - blk = l.log.Read(b.Addr.Blkno) + blk, _ = l.log.Read(b.Addr.Blkno) blks[b.Addr.Blkno] = blk } b.Install(blk) @@ -78,30 +85,31 @@ func (l *Log) installBufs(bufs []*buf.Buf) []wal.Update { // Acquires the commit log, installs the buffers into their // blocks, and appends the blocks to the in-memory log. -func (l *Log) doCommit(bufs []*buf.Buf) (wal.LogPosition, bool) { +func (l *Log) doCommit(bufs []*buf.Buf) (wal.LogPosition, error) { l.mu.Lock() blks := l.installBufs(bufs) util.DPrintf(3, "doCommit: %v bufs\n", len(blks)) - n, ok := l.log.MemAppend(blks) + n, err := l.log.MemAppend(blks) // FIXME: should only be set if ok l.pos = n l.mu.Unlock() - return n, ok + return n, err } // Commit dirty bufs of the transaction into the log, and perhaps wait. -func (l *Log) CommitWait(bufs []*buf.Buf, wait bool) bool { - var commit = true +func (l *Log) CommitWait(bufs []*buf.Buf, wait bool) error { + // var commit = true if len(bufs) > 0 { - n, ok := l.doCommit(bufs) - if !ok { + n, err := l.doCommit(bufs) + if err != nil { util.DPrintf(10, "memappend failed; log is too small\n") - commit = false + // commit = false + return err } else { if wait { l.log.Flush(n) @@ -110,7 +118,8 @@ func (l *Log) CommitWait(bufs []*buf.Buf, wait bool) bool { } else { util.DPrintf(5, "commit read-only trans\n") } - return commit + return nil + // return commit } // NOTE: this is coarse-grained and unattached to the transaction ID diff --git a/txn/txn.go b/txn/txn.go index bcdbca4..a880388 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -6,7 +6,7 @@ package txn import ( - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/addr" "github.com/mit-pdos/go-journal/jrnl" @@ -26,12 +26,16 @@ type Txn struct { acquired map[uint64]bool } -func Init(d disk.Disk) *Log { +func Init(d disk.Disk) (*Log, error) { + mklog, err := obj.MkLog(d) + if err != nil { + return nil, err + } twophasePre := &Log{ - log: obj.MkLog(d), + log: mklog, locks: lockmap.MkLockMap(), } - return twophasePre + return twophasePre, nil } // Start a local transaction with no writes from a global Log. @@ -73,16 +77,20 @@ func (txn *Txn) ReleaseAll() { } } -func (txn *Txn) readBufNoAcquire(addr addr.Addr, sz uint64) []byte { +func (txn *Txn) readBufNoAcquire(addr addr.Addr, sz uint64) ([]byte, error) { // PERFORMANCE-IMPACTING HACK: // Copying out the data to a new slice isn't necessary, // but we need to make it explicit to the proof that we // aren't using the read-modify feature of buftxn. - s := util.CloneByteSlice(txn.buftxn.ReadBuf(addr, sz).Data) - return s + buf, err := txn.buftxn.ReadBuf(addr, sz) + if err != nil { + return nil, err + } + s := util.CloneByteSlice(buf.Data) + return s, nil } -func (txn *Txn) ReadBuf(addr addr.Addr, sz uint64) []byte { +func (txn *Txn) ReadBuf(addr addr.Addr, sz uint64) ([]byte, error) { txn.Acquire(addr) return txn.readBufNoAcquire(addr, sz) } @@ -93,9 +101,12 @@ func (txn *Txn) OverWrite(addr addr.Addr, sz uint64, data []byte) { txn.buftxn.OverWrite(addr, sz, data) } -func (txn *Txn) ReadBufBit(addr addr.Addr) bool { - dataByte := txn.ReadBuf(addr, 1)[0] - return 1 == ((dataByte >> (addr.Off % 8)) & 1) +func (txn *Txn) ReadBufBit(addr addr.Addr) (bool, error) { + dataByte, err := txn.ReadBuf(addr, 1) + if err != nil { + return false, err + } + return 1 == ((dataByte[0] >> (addr.Off % 8)) & 1), nil } func bitToByte(off uint64, data bool) byte { @@ -122,13 +133,13 @@ func (txn *Txn) NDirty() uint64 { return txn.buftxn.NDirty() } -func (txn *Txn) commitNoRelease(wait bool) bool { +func (txn *Txn) commitNoRelease(wait bool) error { util.DPrintf(5, "tp Commit %p\n", txn) return txn.buftxn.CommitWait(wait) } -func (txn *Txn) Commit(wait bool) bool { - ok := txn.commitNoRelease(wait) +func (txn *Txn) Commit(wait bool) error { + err := txn.commitNoRelease(wait) txn.ReleaseAll() - return ok + return err } diff --git a/txn/txn_test.go b/txn/txn_test.go index 69aea75..e569d53 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -5,9 +5,10 @@ import ( "testing" "github.com/mit-pdos/go-journal/addr" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/txn" + "github.com/mit-pdos/go-journal/wal" "github.com/stretchr/testify/assert" - "github.com/tchajed/goose/machine/disk" ) func data(sz int) []byte { @@ -23,37 +24,41 @@ func blockAddr(a uint64) addr.Addr { } } -const blockSz uint64 = 8 * 4096 +const blockSz uint64 = 8 * disk.BlockSize func TestReadWrite(t *testing.T) { d := disk.NewMemDisk(10000) - tsys := txn.Init(d) + tsys, err := txn.Init(d) + assert.Nil(t, err) - x := data(4096) + x := data(int(disk.BlockSize)) tx := txn.Begin(tsys) - tx.OverWrite(blockAddr(513), blockSz, x) + tx.OverWrite(blockAddr(wal.LOGDISKBLOCKS), blockSz, x) tx.Commit(true) tx = txn.Begin(tsys) - buf := tx.ReadBuf(blockAddr(513), blockSz) + buf, err := tx.ReadBuf(blockAddr(wal.LOGDISKBLOCKS), blockSz) + assert.Nil(t, err) assert.Equal(t, x, buf, "read incorrect data") tx.ReleaseAll() } func TestReadWriteAsync(t *testing.T) { d := disk.NewMemDisk(10000) - tsys := txn.Init(d) + tsys, err := txn.Init(d) + assert.Nil(t, err) - x := data(4096) + x := data(int(disk.BlockSize)) tx := txn.Begin(tsys) - tx.OverWrite(blockAddr(513), blockSz, x) + tx.OverWrite(blockAddr(wal.LOGDISKBLOCKS), blockSz, x) tx.Commit(false) tsys.Flush() tx = txn.Begin(tsys) - buf := tx.ReadBuf(blockAddr(513), blockSz) + buf, err := tx.ReadBuf(blockAddr(wal.LOGDISKBLOCKS), blockSz) + assert.Nil(t, err) assert.Equal(t, x, buf, "read incorrect data") tx.ReleaseAll() } diff --git a/wal/00walconst.go b/wal/00walconst.go index fbd775b..59fd45e 100644 --- a/wal/00walconst.go +++ b/wal/00walconst.go @@ -1,23 +1,24 @@ -// wal implements write-ahead logging +// wal implements write-ahead logging // -// The layout of log: -// [ installed writes | logged writes | in-memory/logged | unstable in-memory ] -// ^ ^ ^ ^ -// 0 memStart diskEnd nextDiskEnd +// The layout of log: +// [ installed writes | logged writes | in-memory/logged | unstable in-memory ] // -// Blocks in the range [diskEnd, nextDiskEnd) are in the process of -// being logged. Blocks in unstable are unstably committed (i.e., -// written by NFS Write with the unstable flag and they can be lost -// on crash). Later transactions may absorb them (e.g., a later NFS -// write may update the same inode or indirect block). The code -// implements a policy of postponing writing unstable blocks to disk -// as long as possible to maximize the chance of absorption (i.e., -// commitWait or log is full). It may better to start logging -// earlier. +// ^ ^ ^ ^ +// 0 memStart diskEnd nextDiskEnd +// +// Blocks in the range [diskEnd, nextDiskEnd) are in the process of +// being logged. Blocks in unstable are unstably committed (i.e., +// written by NFS Write with the unstable flag and they can be lost +// on crash). Later transactions may absorb them (e.g., a later NFS +// write may update the same inode or indirect block). The code +// implements a policy of postponing writing unstable blocks to disk +// as long as possible to maximize the chance of absorption (i.e., +// commitWait or log is full). It may better to start logging +// earlier. package wal import ( - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/common" ) diff --git a/wal/0circular.go b/wal/0circular.go index d5f365d..bb69417 100644 --- a/wal/0circular.go +++ b/wal/0circular.go @@ -1,7 +1,7 @@ package wal import ( - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "github.com/tchajed/marshal" "github.com/mit-pdos/go-journal/common" @@ -51,20 +51,29 @@ func decodeHdr2(hdr2 disk.Block) uint64 { return start } -func recoverCircular(d disk.Disk) (*circularAppender, LogPosition, LogPosition, []Update) { - hdr1 := d.Read(LOGHDR) - hdr2 := d.Read(LOGHDR2) +func recoverCircular(d disk.Disk) (*circularAppender, LogPosition, LogPosition, []Update, error) { + hdr1, err := d.Read(LOGHDR) + if err != nil { + return nil, 0, 0, nil, err + } + hdr2, err := d.Read(LOGHDR2) + if err != nil { + return nil, 0, 0, nil, err + } end, addrs := decodeHdr1(hdr1) start := decodeHdr2(hdr2) var bufs []Update for pos := start; pos < end; pos++ { addr := addrs[pos%LOGSZ] - b := d.Read(LOGSTART + pos%LOGSZ) + b, err := d.Read(LOGSTART + pos%LOGSZ) + if err != nil { + return nil, 0, 0, nil, err + } bufs = append(bufs, Update{Addr: addr, Block: b}) } return &circularAppender{ diskAddrs: addrs, - }, LogPosition(start), LogPosition(end), bufs + }, LogPosition(start), LogPosition(end), bufs, nil } func (c *circularAppender) hdr1(end LogPosition) disk.Block { diff --git a/wal/0waldefs.go b/wal/0waldefs.go index 4e1ab48..af7e800 100644 --- a/wal/0waldefs.go +++ b/wal/0waldefs.go @@ -1,7 +1,7 @@ package wal import ( - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/common" diff --git a/wal/installer.go b/wal/installer.go index 24dce25..e274654 100644 --- a/wal/installer.go +++ b/wal/installer.go @@ -1,7 +1,7 @@ package wal import ( - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/util" ) diff --git a/wal/wal.go b/wal/wal.go index a311192..fe3df7d 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -1,18 +1,22 @@ package wal import ( + "fmt" "sync" "github.com/tchajed/goose/machine" - "github.com/tchajed/goose/machine/disk" + "github.com/mit-pdos/go-journal/disk" "github.com/mit-pdos/go-journal/common" "github.com/mit-pdos/go-journal/util" ) -func mkLog(disk disk.Disk) *Walog { - circ, start, end, memLog := recoverCircular(disk) +func mkLog(disk disk.Disk) (*Walog, error) { + circ, start, end, memLog, err := recoverCircular(disk) + if err != nil { + return nil, err + } ml := new(sync.Mutex) st := &WalogState{ memLog: mkSliding(memLog, start), @@ -30,7 +34,7 @@ func mkLog(disk disk.Disk) *Walog { condShut: sync.NewCond(ml), } util.DPrintf(1, "mkLog: size %d\n", LOGSZ) - return l + return l, nil } func (l *Walog) startBackgroundThreads() { @@ -38,10 +42,13 @@ func (l *Walog) startBackgroundThreads() { go func() { l.installer() }() } -func MkLog(disk disk.Disk) *Walog { - l := mkLog(disk) +func MkLog(disk disk.Disk) (*Walog, error) { + l, err := mkLog(disk) + if err != nil { + return nil, err + } l.startBackgroundThreads() - return l + return l, nil } // Assumes caller holds memLock @@ -92,7 +99,7 @@ func (l *Walog) ReadMem(blkno common.Bnum) (disk.Block, bool) { } // Read from only the installed state (a subset of durable state). -func (l *Walog) ReadInstalled(blkno common.Bnum) disk.Block { +func (l *Walog) ReadInstalled(blkno common.Bnum) (disk.Block, error) { return l.d.Read(blkno) } @@ -100,10 +107,10 @@ func (l *Walog) ReadInstalled(blkno common.Bnum) disk.Block { // difficult-to-linearize way (specifically, it is future-dependent when to // linearize between the l.memLog.Unlock() and the eventual disk read, due to // potential concurrent cache or disk writes). -func (l *Walog) Read(blkno common.Bnum) disk.Block { +func (l *Walog) Read(blkno common.Bnum) (disk.Block, error) { blk, ok := l.ReadMem(blkno) if ok { - return blk + return blk, nil } return l.ReadInstalled(blkno) } @@ -127,18 +134,19 @@ func (st *WalogState) memLogHasSpace(newUpdates uint64) bool { // // On failure guaranteed to be idempotent (failure can only occur in principle, // due overflowing 2^64 writes) -func (l *Walog) MemAppend(bufs []Update) (LogPosition, bool) { +func (l *Walog) MemAppend(bufs []Update) (LogPosition, error) { if uint64(len(bufs)) > LOGSZ { - return 0, false + return 0, fmt.Errorf("bufs size large than LOGSZ") } var txn LogPosition = 0 - var ok = true + var err error l.memLock.Lock() st := l.st for { if st.updatesOverflowU64(uint64(len(bufs))) { - ok = false + // ok = false + err = fmt.Errorf("updatesOverflowU64") break } if st.memLogHasSpace(uint64(len(bufs))) { @@ -154,7 +162,7 @@ func (l *Walog) MemAppend(bufs []Update) (LogPosition, bool) { continue } l.memLock.Unlock() - return txn, ok + return txn, err } // Flush flushes a transaction pos (and all preceding transactions) diff --git a/wal/wal_test.go b/wal/wal_test.go index b72ea5b..c89ce16 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -4,9 +4,9 @@ import ( "reflect" "testing" + "github.com/mit-pdos/go-journal/disk" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" - "github.com/tchajed/goose/machine/disk" "github.com/mit-pdos/go-journal/common" ) @@ -21,12 +21,16 @@ func dataBnum(x common.Bnum) common.Bnum { } func (l logWrapper) Read(bn common.Bnum) disk.Block { - return l.Walog.Read(dataBnum(bn)) + blk, err := l.Walog.Read(dataBnum(bn)) + if err != nil { + panic(err) + } + return blk } func (l logWrapper) MemAppend(txn []Update) LogPosition { - pos, ok := l.Walog.MemAppend(txn) - l.assert.Equalf(true, ok, + pos, err := l.Walog.MemAppend(txn) + l.assert.Equalf(nil, err, "mem append of %v blocks failed", len(txn)) return pos } @@ -60,7 +64,11 @@ func (l logWrapper) installOnce() { func (l *logWrapper) Restart() { l.Walog.Shutdown() d := l.Walog.d - l.Walog = mkLog(d) + var err error + l.Walog, err = mkLog(d) + if err != nil { + panic(err) + } } type WalSuite struct { @@ -71,7 +79,11 @@ type WalSuite struct { func (suite *WalSuite) SetupTest() { suite.d = disk.NewMemDisk(10000) - suite.l = logWrapper{assert: suite.Assert(), Walog: mkLog(suite.d)} + walog, err := mkLog(suite.d) + if err != nil { + panic(err) + } + suite.l = logWrapper{assert: suite.Assert(), Walog: walog} } func TestWal(t *testing.T) {