Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync2: dbset: use single connection for each sync session #6446

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
118 changes: 98 additions & 20 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"errors"
"fmt"
"maps"
"math/rand/v2"
"net/url"
"os"
"strings"
Expand Down Expand Up @@ -223,8 +224,12 @@

// OpenInMemory creates an in-memory database.
func OpenInMemory(opts ...Opt) (*sqliteDatabase, error) {
opts = append(opts, WithConnections(1), withForceFresh())
return Open("file::memory:?mode=memory", opts...)
opts = append(opts, withForceFresh())
// Unique uri is needed to avoid sharing the same in-memory database,
// while allowing multiple connections to the same database.
uri := fmt.Sprintf("file:mem-%d-%d?mode=memory&cache=shared",
rand.Uint64(), rand.Uint64())
return Open(uri, opts...)
}

// InMemory creates an in-memory database for testing and panics if
Expand Down Expand Up @@ -572,21 +577,68 @@
type Database interface {
Executor
QueryCache
// Close closes the database.
Close() error
// QueryCount returns the number of queries executed on the database.
QueryCount() int
// QueryCache returns the query cache for this database, if it's present,
// or nil otherwise.
QueryCache() QueryCache
// Tx creates deferred sqlite transaction.
//
// Deferred transactions are not started until the first statement.
// Transaction may be started in read mode and automatically upgraded to write mode
// after one of the write statements.
//
// https://www.sqlite.org/lang_transaction.html
Tx(ctx context.Context) (Transaction, error)
// WithTx starts a new transaction and passes it to the exec function.
// It then commits the transaction if the exec function doesn't return an error,
// and rolls it back otherwise.
// If the context is canceled, the currently running SQL statement is interrupted.
WithTx(ctx context.Context, exec func(Transaction) error) error
// TxImmediate begins a new immediate transaction on the database, that is,
// a transaction that starts a write immediately without waiting for a write
// statement.
// The transaction returned from this function must always be released by calling
// its Release method. Release rolls back the transaction if it hasn't been
// committed.
// If the context is canceled, the currently running SQL statement is interrupted.
TxImmediate(ctx context.Context) (Transaction, error)
// WithTxImmediate starts a new immediate transaction and passes it to the exec
// function.
// An immediate transaction is started immediately, without waiting for a write
// statement.
// It then commits the transaction if the exec function doesn't return an error,
// and rolls it back otherwise.
// If the context is canceled, the currently running SQL statement is interrupted.
WithTxImmediate(ctx context.Context, exec func(Transaction) error) error
// WithConnection executes the provided function with a connection from the
// database pool.
// If many queries are to be executed in a row, but there's no need for an
// explicit transaction which may be long-running and thus block
// WAL checkpointing, it may be preferable to use a single connection for
// it to avoid database pool overhead.
// The connection is released back to the pool after the function returns.
// If the context is canceled, the currently running SQL statement is interrupted.
WithConnection(ctx context.Context, exec func(Executor) error) error
// Intercept adds an interceptor function to the database. The interceptor
// functions are invoked upon each query on the database, including queries
// executed within transactions.
// The query will fail if the interceptor returns an error.
// The interceptor can later be removed using RemoveInterceptor with the same key.
Intercept(key string, fn Interceptor)
// RemoveInterceptor removes the interceptor function with specified key from the database.
RemoveInterceptor(key string)
}

// Transaction represents a transaction.
type Transaction interface {
Executor
// Commit commits the transaction.
Commit() error
// Release releases the transaction. If the transaction hasn't been committed,
// it's rolled back.
Release() error
}

Expand Down Expand Up @@ -684,34 +736,22 @@
return nil
}

// Tx creates deferred sqlite transaction.
//
// Deferred transactions are not started until the first statement.
// Transaction may be started in read mode and automatically upgraded to write mode
// after one of the write statements.
//
// https://www.sqlite.org/lang_transaction.html
// Tx implements Database.
func (db *sqliteDatabase) Tx(ctx context.Context) (Transaction, error) {
return db.getTx(ctx, beginDefault)
}

// WithTx will pass initialized deferred transaction to exec callback.
// Will commit only if error is nil.
// WithTx implements Database.
func (db *sqliteDatabase) WithTx(ctx context.Context, exec func(Transaction) error) error {
return db.withTx(ctx, beginDefault, exec)
}

// TxImmediate creates immediate transaction.
//
// IMMEDIATE cause the database connection to start a new write immediately, without waiting
// for a write statement. The BEGIN IMMEDIATE might fail with SQLITE_BUSY if another write
// transaction is already active on another database connection.
// TxImmediate implements Database.
func (db *sqliteDatabase) TxImmediate(ctx context.Context) (Transaction, error) {
return db.getTx(ctx, beginImmediate)
}

// WithTxImmediate will pass initialized immediate transaction to exec callback.
// Will commit only if error is nil.
// WithTxImmediate implements Database.
func (db *sqliteDatabase) WithTxImmediate(ctx context.Context, exec func(Transaction) error) error {
return db.withTx(ctx, beginImmediate, exec)
}
Expand All @@ -727,7 +767,7 @@
return nil
}

// Exec statement using one of the connection from the pool.
// Exec implements Executor.
//
// If you care about atomicity of the operation (for example writing rewards to multiple accounts)
// Tx should be used. Otherwise sqlite will not guarantee that all side-effects of operations are
Expand Down Expand Up @@ -758,7 +798,7 @@
return exec(conn, query, encoder, decoder)
}

// Close closes all pooled connections.
// Close implements Database.
func (db *sqliteDatabase) Close() error {
db.closeMux.Lock()
defer db.closeMux.Unlock()
Expand All @@ -772,6 +812,23 @@
return nil
}

// WithConnection implements Database.
func (db *sqliteDatabase) WithConnection(ctx context.Context, exec func(Executor) error) error {
if db.closed {
return ErrClosed
}

Check warning on line 819 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L818-L819

Added lines #L818 - L819 were not covered by tests
conCtx, cancel := context.WithCancel(ctx)
conn := db.getConn(conCtx)
defer func() {
cancel()
db.pool.Put(conn)
}()
if conn == nil {
return ErrNoConnection
}

Check warning on line 828 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L827-L828

Added lines #L827 - L828 were not covered by tests
return exec(&sqliteConn{queryCache: db.queryCache, db: db, conn: conn})
}

// Intercept adds an interceptor function to the database. The interceptor functions
// are invoked upon each query. The query will fail if the interceptor returns an error.
// The interceptor can later be removed using RemoveInterceptor with the same key.
Expand Down Expand Up @@ -1093,6 +1150,27 @@
return exec(tx.conn, query, encoder, decoder)
}

type sqliteConn struct {
*queryCache
db *sqliteDatabase
conn *sqlite.Conn
}

func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) {
if err := c.db.runInterceptors(query); err != nil {
return 0, fmt.Errorf("running query interceptors: %w", err)
}

Check warning on line 1162 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L1161-L1162

Added lines #L1161 - L1162 were not covered by tests

c.db.queryCount.Add(1)
if c.db.latency != nil {
start := time.Now()
defer func() {
c.db.latency.WithLabelValues(query).Observe(float64(time.Since(start)))
}()

Check warning on line 1169 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L1166-L1169

Added lines #L1166 - L1169 were not covered by tests
}
return exec(c.conn, query, encoder, decoder)
}

func mapSqliteError(err error) error {
switch sqlite.ErrCode(err) {
case sqlite.SQLITE_CONSTRAINT_PRIMARYKEY, sqlite.SQLITE_CONSTRAINT_UNIQUE:
Expand Down
21 changes: 21 additions & 0 deletions sql/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,3 +638,24 @@ func TestExclusive(t *testing.T) {
})
}
}

func TestConnection(t *testing.T) {
db := InMemoryTest(t)
var r int
require.NoError(t, db.WithConnection(context.Background(), func(ex Executor) error {
n, err := ex.Exec("select ?", func(stmt *Statement) {
stmt.BindInt64(1, 42)
}, func(stmt *Statement) bool {
r = stmt.ColumnInt(0)
return true
})
require.NoError(t, err)
require.Equal(t, 1, n)
require.Equal(t, 42, r)
return nil
}))

require.Error(t, db.WithConnection(context.Background(), func(Executor) error {
return errors.New("error")
}))
}
4 changes: 4 additions & 0 deletions sql/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ import "go.uber.org/zap"

// Executor is an interface for executing raw statement.
type Executor interface {
// Exec executes a statement.
Exec(string, Encoder, Decoder) (int, error)
}

// Migration is interface for migrations provider.
type Migration interface {
// Apply applies the migration.
Apply(db Executor, logger *zap.Logger) error
// Name returns the name of the migration.
Rollback() error
Name() string
// Order returns the sequential number of the migration.
Order() int
}
9 changes: 6 additions & 3 deletions sql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ func LoadDBSchemaScript(db Executor) (string, error) {
return "", err
}
fmt.Fprintf(&sb, "PRAGMA user_version = %d;\n", version)
// The following SQL query ensures that tables are listed first,
// ordered by name, and then all other objects, ordered by their table name
// and then by their own name.
if _, err = db.Exec(`
SELECT tbl_name, sql || ';'
FROM sqlite_master
WHERE sql IS NOT NULL AND tbl_name NOT LIKE 'sqlite_%'
ORDER BY
CASE WHEN type = 'table' THEN 1 ELSE 2 END, -- ensures tables are first
tbl_name, -- tables are sorted by name, then all other objects
name -- (indexes, triggers, etc.) also by name
CASE WHEN type = 'table' THEN 1 ELSE 2 END,
tbl_name,
name
`, nil, func(st *Statement) bool {
fmt.Fprintln(&sb, st.ColumnText(1))
return true
Expand Down
46 changes: 32 additions & 14 deletions sync2/dbset/dbset.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dbset

import (
"context"
"fmt"
"maps"
"sync"
Expand Down Expand Up @@ -49,7 +50,16 @@
return true
}

// Loaded returns true if the DBSet is loaded.
// Implements rangesync.OrderedSet.
func (d *DBSet) Loaded() bool {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
return d.ft != nil

Check warning on line 58 in sync2/dbset/dbset.go

View check run for this annotation

Codecov / codecov/patch

sync2/dbset/dbset.go#L55-L58

Added lines #L55 - L58 were not covered by tests
}

// EnsureLoaded ensures that the DBSet is loaded and ready to be used.
// Implements rangesync.OrderedSet.
func (d *DBSet) EnsureLoaded() error {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
Expand All @@ -65,7 +75,7 @@
if err != nil {
return fmt.Errorf("error loading count: %w", err)
}
d.dbStore = fptree.NewDBBackedStore(d.db, d.snapshot, count, d.keyLen)
d.dbStore = fptree.NewDBBackedStore(d.db, d.snapshot, d.keyLen)
d.ft = fptree.NewFPTree(count, d.dbStore, d.keyLen, d.maxDepth)
return d.snapshot.Load(d.db, d.handleIDfromDB)
}
Expand Down Expand Up @@ -217,28 +227,39 @@
return d.snapshot.LoadSinceSnapshot(d.db, oldSnapshot, d.handleIDfromDB)
}

// Copy creates a copy of the DBSet.
// WithCopy invokes the specified function, passing it a temporary copy of the DBSet.
// Implements rangesync.OrderedSet.
func (d *DBSet) Copy(syncScope bool) rangesync.OrderedSet {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
if d.ft == nil {
// FIXME
panic("BUG: can't copy the DBItemStore before it's loaded")
func (d *DBSet) WithCopy(ctx context.Context, toCall func(rangesync.OrderedSet) error) error {
if err := d.EnsureLoaded(); err != nil {
return fmt.Errorf("loading DBSet: %w", err)

Check warning on line 234 in sync2/dbset/dbset.go

View check run for this annotation

Codecov / codecov/patch

sync2/dbset/dbset.go#L234

Added line #L234 was not covered by tests
}
d.loadMtx.Lock()
ft := d.ft.Clone().(*fptree.FPTree)
return &DBSet{
ds := &DBSet{
db: d.db,
ft: ft,
st: d.st,
snapshot: d.snapshot,
keyLen: d.keyLen,
maxDepth: d.maxDepth,
dbStore: d.dbStore,
received: maps.Clone(d.received),
}
d.loadMtx.Unlock()
defer ds.release()
db, ok := d.db.(sql.Database)
if ok {
return db.WithConnection(ctx, func(ex sql.Executor) error {
ds.db = ex
return toCall(ds)
})
} else {
return toCall(ds)
}
}

// Has returns true if the DBSet contains the given item.
// Implements rangesync.OrderedSet.
func (d *DBSet) Has(k rangesync.KeyBytes) (bool, error) {
if err := d.EnsureLoaded(); err != nil {
return false, err
Expand All @@ -258,17 +279,14 @@
}

// Recent returns a sequence of items that have been added to the DBSet since the given time.
// Implements rangesync.OrderedSet.
func (d *DBSet) Recent(since time.Time) (rangesync.SeqResult, int) {
return d.dbStore.Since(make(rangesync.KeyBytes, d.keyLen), since.UnixNano())
}

// Release releases resources associated with the DBSet.
func (d *DBSet) Release() error {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
func (d *DBSet) release() {
if d.ft != nil {
d.ft.Release()
d.ft = nil
}
return nil
}
Loading
Loading