Skip to content

Commit

Permalink
wip - fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rianhughes committed Jan 7, 2025
1 parent 031de6c commit 484570e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 16 deletions.
31 changes: 21 additions & 10 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (p *Pool) dbWriter() {
// loadFromDB restores the in-memory transaction pool from the database
func (p *Pool) loadFromDB() error {
return p.db.View(func(txn db.Transaction) error {
var headValue *felt.Felt
headValue := new(felt.Felt)
err := p.headHash(txn, headValue)
if err != nil {
if errors.Is(err, db.ErrKeyNotFound) {
Expand All @@ -102,7 +102,7 @@ func (p *Pool) loadFromDB() error {

currentHash := headValue
for currentHash != nil {
curElem, err := p.elem(txn, currentHash)
curElem, err := p.dbElem(txn, currentHash)
if err != nil {
return err
}
Expand All @@ -112,7 +112,7 @@ func (p *Pool) loadFromDB() error {
}

if curElem.NextHash != nil {
nxtElem, err := p.elem(txn, curElem.NextHash)
nxtElem, err := p.dbElem(txn, curElem.NextHash)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (p *Pool) handleTransaction(userTxn *BroadcastedTransaction) error {
tailValue = nil
}

if err := p.putElem(dbTxn, userTxn.Transaction.Hash(), &storageElem{
if err := p.putdbElem(dbTxn, userTxn.Transaction.Hash(), &storageElem{
Txn: *userTxn,
}); err != nil {
return err
Expand All @@ -158,12 +158,12 @@ func (p *Pool) handleTransaction(userTxn *BroadcastedTransaction) error {
if tailValue != nil {
// Update old tail to point to the new item
var oldTailElem storageElem
oldTailElem, err := p.elem(dbTxn, tailValue)
oldTailElem, err := p.dbElem(dbTxn, tailValue)
if err != nil {
return err
}
oldTailElem.NextHash = userTxn.Transaction.Hash()
if err = p.putElem(dbTxn, tailValue, &oldTailElem); err != nil {
if err = p.putdbElem(dbTxn, tailValue, &oldTailElem); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -193,8 +193,6 @@ func (p *Pool) Push(userTxn *BroadcastedTransaction) error {

// todo(rian this PR): validation

// p.handleTransaction(userTxn)

// todo: should db overloading block the in-memory mempool??
select {
case p.dbWriteChan <- userTxn:
Expand Down Expand Up @@ -295,6 +293,19 @@ func (p *Pool) headHash(txn db.Transaction, head *felt.Felt) error {
})
}

func (p *Pool) HeadHash() (*felt.Felt, error) {
txn, err := p.db.NewTransaction(false)
if err != nil {
return nil, err
}
var head *felt.Felt
err = txn.Get([]byte(headKey), func(b []byte) error {
head = new(felt.Felt).SetBytes(b)
return nil
})
return head, err
}

func (p *Pool) updateHead(txn db.Transaction, head *felt.Felt) error {
return txn.Set([]byte(headKey), head.Marshal())
}
Expand All @@ -310,15 +321,15 @@ func (p *Pool) updateTail(txn db.Transaction, tail *felt.Felt) error {
return txn.Set([]byte(tailKey), tail.Marshal())
}

func (p *Pool) elem(txn db.Transaction, itemKey *felt.Felt) (storageElem, error) {
func (p *Pool) dbElem(txn db.Transaction, itemKey *felt.Felt) (storageElem, error) {
var item storageElem
err := txn.Get(itemKey.Marshal(), func(b []byte) error {
return encoder.Unmarshal(b, &item)
})
return item, err
}

func (p *Pool) putElem(txn db.Transaction, itemKey *felt.Felt, item *storageElem) error {
func (p *Pool) putdbElem(txn db.Transaction, itemKey *felt.Felt, item *storageElem) error {
itemBytes, err := encoder.Marshal(item)
if err != nil {
return err
Expand Down
23 changes: 17 additions & 6 deletions mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,35 @@ func TestRestoreMempool(t *testing.T) {
}))
assert.Equal(t, uint16(i), pool.Len())
}

// Todo: reads should block??
fmt.Println(pool.HeadHash())
time.Sleep(100 * time.Millisecond)
fmt.Println(pool.HeadHash())
lenDB, err = pool.LenDB()
require.NoError(t, err)
assert.Equal(t, uint16(3), lenDB)

// Close the mempool
require.NoError(t, closer())
fmt.Println("sdfsdfdf")
_, closer2, err := mempool.New(*testDB, 1024)

poolRestored, closer2, err := mempool.New(*testDB, 1024)
require.NoError(t, err)
fmt.Println("sdfsdfdsssf")
lenDB, err = pool.LenDB()
lenDB, err = poolRestored.LenDB()
require.NoError(t, err)
assert.Equal(t, uint16(3), lenDB)
assert.Equal(t, uint16(3), poolRestored.Len())
fmt.Println(poolRestored.HeadHash())

// Remove transactions
_, err = poolRestored.Pop()
require.NoError(t, err)
_, err = poolRestored.Pop()
require.NoError(t, err)
lenDB, err = poolRestored.LenDB()
assert.Equal(t, uint16(3), lenDB)
assert.Equal(t, uint16(3), pool.Len())
assert.Equal(t, uint16(1), poolRestored.Len())

// // Remove transaction
closer2()
}

Expand Down

0 comments on commit 484570e

Please sign in to comment.