Skip to content

Commit

Permalink
Add query interceptor API
Browse files Browse the repository at this point in the history
  • Loading branch information
simolus3 committed Nov 7, 2023
1 parent de7e1ce commit 446832c
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 2 deletions.
4 changes: 4 additions & 0 deletions drift/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.14.0-dev

- Add the `QueryInterceptor` to easily monitor all database calls made by drift.

## 2.13.1

- Fix a bug when running batches over serialized communication channels.
Expand Down
1 change: 1 addition & 0 deletions drift/lib/drift.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export 'src/runtime/data_verification.dart';
export 'src/runtime/exceptions.dart';
export 'src/runtime/executor/connection_pool.dart';
export 'src/runtime/executor/executor.dart';
export 'src/runtime/executor/interceptor.dart';
export 'src/runtime/query_builder/query_builder.dart'
hide CaseWhenExpressionWithBase, BaseCaseWhenExpression;
export 'src/runtime/types/converters.dart';
Expand Down
178 changes: 178 additions & 0 deletions drift/lib/src/runtime/executor/interceptor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import '../api/runtime_api.dart';
import '../query_builder/query_builder.dart';
import 'executor.dart';

/// Extension to wrap a [QueryExecutor] with a [QueryInterceptor].
extension ApplyInterceptor on QueryExecutor {
/// Returns a [QueryExecutor] that will use `this` executor internally, but
/// with calls intercepted by the given [interceptor].
///
/// This can be used to, for instance, write a custom statement logger or to
/// retry failing statements automatically.
QueryExecutor interceptWith(QueryInterceptor interceptor) {
final $this = this;

if ($this is TransactionExecutor) {
return _InterceptedTransactionExecutor($this, interceptor);
} else {
return _InterceptedExecutor($this, interceptor);
}
}
}

/// Extension to wrap a [DatabaseConnection] with a [QueryInterceptor].
extension ApplyInterceptorConnection on DatabaseConnection {
/// Returns a [DatabaseConnection] that will use the same stream queries as
/// `this`, but replaces its executor by wrapping it with the [interceptor].
///
/// See also: [ApplyInterceptor.interceptWith].
DatabaseConnection interceptWith(QueryInterceptor interceptor) {
return withExecutor(executor.interceptWith(interceptor));
}
}

/// An interceptor for SQL queries.
///
/// This wraps an existing [QueryExecutor] implemented by drift, and by default
/// does nothing. However, specific methods can be overridden to customize the
/// behavior of an existing query executor.
abstract class QueryInterceptor {
/// Intercept [QueryExecutor.dialect] calls.
SqlDialect dialect(QueryExecutor executor) => executor.dialect;

/// Intercept [QueryExecutor.beginTransaction] calls.
TransactionExecutor beginTransaction(QueryExecutor parent) =>
parent.beginTransaction();

/// Intercept [TransactionExecutor.supportsNestedTransactions] calls.
bool transactionCanBeNested(TransactionExecutor inner) {
return inner.supportsNestedTransactions;
}

/// Intercept [QueryExecutor.close] calls.
Future<void> close(QueryExecutor inner) => inner.close();

/// Intercept [TransactionExecutor.send] calls.
Future<void> commitTransaction(TransactionExecutor inner) {
return inner.send();
}

/// Intercept [TransactionExecutor.rollback] calls.
Future<void> rollbackTransaction(TransactionExecutor inner) {
return inner.rollback();
}

/// Intercept [QueryExecutor.ensureOpen] calls.
Future<bool> ensureOpen(QueryExecutor executor, QueryExecutorUser user) =>
executor.ensureOpen(user);

/// Intercept [QueryExecutor.runBatched] calls.
Future<void> runBatched(
QueryExecutor executor, BatchedStatements statements) {
return executor.runBatched(statements);
}

/// Intercept [QueryExecutor.runCustom] calls.
Future<void> runCustom(
QueryExecutor executor, String statement, List<Object?> args) {
return executor.runCustom(statement, args);
}

/// Intercept [QueryExecutor.runInsert] calls.
Future<int> runInsert(
QueryExecutor executor, String statement, List<Object?> args) {
return executor.runInsert(statement, args);
}

/// Intercept [QueryExecutor.runDelete] calls.
Future<int> runDelete(
QueryExecutor executor, String statement, List<Object?> args) {
return executor.runDelete(statement, args);
}

/// Intercept [QueryExecutor.runUpdate] calls.
Future<int> runUpdate(
QueryExecutor executor, String statement, List<Object?> args) {
return executor.runUpdate(statement, args);
}

/// Intercept [QueryExecutor.runSelect] calls.
Future<List<Map<String, Object?>>> runSelect(
QueryExecutor executor, String statement, List<Object?> args) {
return executor.runSelect(statement, args);
}
}

class _InterceptedExecutor extends QueryExecutor {
final QueryExecutor _inner;
final QueryInterceptor _interceptor;

_InterceptedExecutor(this._inner, this._interceptor);

@override
TransactionExecutor beginTransaction() => _InterceptedTransactionExecutor(
_interceptor.beginTransaction(_inner), _interceptor);

@override
SqlDialect get dialect => _interceptor.dialect(_inner);

@override
Future<bool> ensureOpen(QueryExecutorUser user) {
return _interceptor.ensureOpen(_inner, user);
}

@override
Future<void> runBatched(BatchedStatements statements) {
return _interceptor.runBatched(_inner, statements);
}

@override
Future<void> runCustom(String statement, [List<Object?>? args]) {
return _interceptor.runCustom(_inner, statement, args ?? const []);
}

@override
Future<int> runDelete(String statement, List<Object?> args) {
return _interceptor.runDelete(_inner, statement, args);
}

@override
Future<int> runInsert(String statement, List<Object?> args) {
return _interceptor.runInsert(_inner, statement, args);
}

@override
Future<List<Map<String, Object?>>> runSelect(
String statement, List<Object?> args) {
return _interceptor.runSelect(_inner, statement, args);
}

@override
Future<int> runUpdate(String statement, List<Object?> args) {
return _interceptor.runUpdate(_inner, statement, args);
}

@override
Future<void> close() {
return _interceptor.close(_inner);
}
}

class _InterceptedTransactionExecutor extends _InterceptedExecutor
implements TransactionExecutor {
_InterceptedTransactionExecutor(super.inner, super.interceptor);

@override
Future<void> rollback() {
return _interceptor.rollbackTransaction(_inner as TransactionExecutor);
}

@override
Future<void> send() {
return _interceptor.commitTransaction(_inner as TransactionExecutor);
}

@override
bool get supportsNestedTransactions =>
_interceptor.transactionCanBeNested(_inner as TransactionExecutor);
}
2 changes: 1 addition & 1 deletion drift/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: drift
description: Drift is a reactive library to store relational data in Dart and Flutter applications.
version: 2.13.1
version: 2.14.0-dev
repository: https://github.com/simolus3/drift
homepage: https://drift.simonbinder.eu/
issue_tracker: https://github.com/simolus3/drift/issues
Expand Down
96 changes: 96 additions & 0 deletions drift/test/engines/interceptor_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import 'dart:async';

import 'package:drift/drift.dart';
import 'package:test/test.dart';

import '../generated/todos.dart';
import '../test_utils/test_utils.dart';

void main() {
test('calls interceptor methods', () async {
final interceptor = EmittingInterceptor();
final events = <String>[];
interceptor.events.stream.listen(events.add);

final database = TodoDb(testInMemoryDatabase().interceptWith(interceptor));
expect(await database.categories.select().get(), isEmpty);
expect(events, ['select']);

await database.batch((batch) {
batch.insert(database.categories,
CategoriesCompanion.insert(description: 'from batch'));
});
expect(events, ['select', 'begin', 'batched', 'commit']);
events.clear();

await database.users.insertOne(
UsersCompanion.insert(name: 'Simon B', profilePicture: Uint8List(0)));
await database.users.update().write(UsersCompanion(isAwesome: Value(true)));
await database.users.delete().go();
expect(events, ['insert', 'update', 'delete']);
});
}

class EmittingInterceptor extends QueryInterceptor {
final events = StreamController<String>();

@override
TransactionExecutor beginTransaction(QueryExecutor parent) {
events.add('begin');
return super.beginTransaction(parent);
}

@override
Future<void> commitTransaction(TransactionExecutor inner) {
events.add('commit');
return super.commitTransaction(inner);
}

@override
Future<void> rollbackTransaction(TransactionExecutor inner) {
events.add('rollback');
return super.rollbackTransaction(inner);
}

@override
Future<void> runBatched(
QueryExecutor executor, BatchedStatements statements) {
events.add('batched');
return super.runBatched(executor, statements);
}

@override
Future<void> runCustom(
QueryExecutor executor, String statement, List<Object?> args) {
events.add('custom');
return super.runCustom(executor, statement, args);
}

@override
Future<int> runInsert(
QueryExecutor executor, String statement, List<Object?> args) {
events.add('insert');
return super.runInsert(executor, statement, args);
}

@override
Future<int> runDelete(
QueryExecutor executor, String statement, List<Object?> args) {
events.add('delete');
return super.runDelete(executor, statement, args);
}

@override
Future<int> runUpdate(
QueryExecutor executor, String statement, List<Object?> args) {
events.add('update');
return super.runUpdate(executor, statement, args);
}

@override
Future<List<Map<String, Object?>>> runSelect(
QueryExecutor executor, String statement, List<Object?> args) {
events.add('select');
return super.runSelect(executor, statement, args);
}
}
2 changes: 1 addition & 1 deletion drift_dev/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies:
io: ^1.0.3

# Drift-specific analysis and apis
drift: '>=2.13.0 <2.14.0'
drift: '>=2.14.0 <2.15.0'
sqlite3: '>=0.1.6 <3.0.0'
sqlparser: '^0.32.0'

Expand Down

0 comments on commit 446832c

Please sign in to comment.