Skip to content

Commit

Permalink
Merge pull request #620 from patchlevel/stream-store
Browse files Browse the repository at this point in the history
[Experimental] Stream Store
  • Loading branch information
DavidBadura authored Jul 29, 2024
2 parents 1ff6007 + 21a31d7 commit c36ff32
Show file tree
Hide file tree
Showing 31 changed files with 3,818 additions and 85 deletions.
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,17 @@ test: phpunit
benchmark: vendor ## run benchmarks
DB_URL=sqlite3:///:memory: vendor/bin/phpbench run tests/Benchmark --report=default

.PHONY: benchmark-diff-test
benchmark-diff-test: vendor ## run benchmarks
.PHONY: benchmark-base
benchmark-base: vendor ## run benchmarks
DB_URL=sqlite3:///:memory: vendor/bin/phpbench run tests/Benchmark --revs=1 --report=default --progress=none --tag=base

.PHONY: benchmark-diff
benchmark-diff: vendor ## run benchmarks
DB_URL=sqlite3:///:memory: vendor/bin/phpbench run tests/Benchmark --revs=1 --report=diff --progress=none --ref=base

.PHONY: benchmark-diff-test
benchmark-diff-test: benchmark-base benchmark-diff ## run benchmarks

.PHONY: dev
dev: static test ## run dev tools

Expand Down
72 changes: 71 additions & 1 deletion baseline.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="5.23.1@8471a896ccea3526b26d082f4461eeea467f10a4">
<files psalm-version="5.25.0@01a8eb06b9e9cc6cfb6a320bf9fb14331919d505">
<file src="src/Aggregate/AggregateRootBehaviour.php">
<UnsafeInstantiation>
<code><![CDATA[new static()]]></code>
Expand Down Expand Up @@ -86,6 +86,11 @@
<code><![CDATA[$dateTimeType->convertToPHPValue($data['recorded_on'], $platform)]]></code>
</MixedArgument>
</file>
<file src="src/Store/StreamDoctrineDbalStoreStream.php">
<ArgumentTypeCoercion>
<code><![CDATA[$data['playhead'] === null ? null : (int)$data['playhead']]]></code>
</ArgumentTypeCoercion>
</file>
<file src="src/Subscription/Store/DoctrineSubscriptionStore.php">
<MixedArgument>
<code><![CDATA[$context]]></code>
Expand Down Expand Up @@ -131,6 +136,14 @@
<code><![CDATA[$store]]></code>
</MissingConstructor>
</file>
<file src="tests/Benchmark/SimpleSetupStreamStoreBench.php">
<MissingConstructor>
<code><![CDATA[$multipleEventsId]]></code>
<code><![CDATA[$repository]]></code>
<code><![CDATA[$singleEventId]]></code>
<code><![CDATA[$store]]></code>
</MissingConstructor>
</file>
<file src="tests/Benchmark/SnapshotsBench.php">
<MissingConstructor>
<code><![CDATA[$adapter]]></code>
Expand Down Expand Up @@ -320,6 +333,63 @@
)]]></code>
</InternalMethod>
</file>
<file src="tests/Unit/Store/StreamDoctrineDbalStoreTest.php">
<DeprecatedMethod>
<code><![CDATA[addMethods]]></code>
</DeprecatedMethod>
<InternalMethod>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
</InternalMethod>
</file>
<file src="tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php">
<PossiblyUndefinedArrayOffset>
<code><![CDATA[$update1]]></code>
Expand Down
6 changes: 6 additions & 0 deletions deptrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ deptrac:
collectors:
- type: directory
value: src/Subscription/.*
- name: Test
collectors:
- type: directory
value: src/Test/.*

ruleset:
Aggregate:
Expand Down Expand Up @@ -175,7 +179,9 @@ deptrac:
Store:
- Aggregate
- Attribute
- Clock
- Message
- Metadata
- Schema
- Serializer
Test:
117 changes: 96 additions & 21 deletions docs/pages/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ Each message contains an event and the associated headers.
More information about the message can be found [here](message.md).

The store is optimized to efficiently store and load events for aggregates.
We currently only offer one [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) store.

## Create DBAL connection

Expand All @@ -29,8 +28,14 @@ $connection = DriverManager::getConnection(

## Configure Store

We currently offer two stores, both based on the [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) library.
The default store is the `DoctrineDbalStore` and the new experimental store is the `StreamDoctrineDbalStore`.

### DoctrineDbalStore

This is the current default store for event sourcing.
You can create a store with the `DoctrineDbalStore` class.
The store needs a dbal connection, an event serializer, an aggregate registry and some options.
The store needs a dbal connection, an event serializer and has some optional parameters like options.

```php
use Doctrine\DBAL\Connection;
Expand All @@ -41,21 +46,17 @@ use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
$store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
null,
[/** options */],
);
```
Following options are available in `DoctrineDbalStore`:

| Option | Type | Default | Description |
|-------------------|------------------|------------|----------------------------------------------|
| table_name | string | eventstore | The name of the table in the database |
| aggregate_id_type | "uuid"|"string" | uuid | The type of the `aggregate_id` column |
| locking | bool | true | If the store should use locking for writing |
| lock_id | int | 133742 | The id of the lock |
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |

## Schema
| Option | Type | Default | Description |
|-------------------|-----------------|------------|----------------------------------------------|
| table_name | string | eventstore | The name of the table in the database |
| aggregate_id_type | "uuid"/"string" | uuid | The type of the `aggregate_id` column |
| locking | bool | true | If the store should use locking for writing |
| lock_id | int | 133742 | The id of the lock |
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |

The table structure of the `DoctrineDbalStore` looks like this:

Expand All @@ -72,13 +73,59 @@ The table structure of the `DoctrineDbalStore` looks like this:
| archived | bool | If the event is archived |
| custom_headers | json | Custom headers for the event |

With the help of the `SchemaDirector`, the database structure can be created, updated and deleted.

!!! note

The default type of the `aggregate_id` column is `uuid` if the database supports it and `string` if not.
You can change the type with the `aggregate_id_type` to `string` if you want use custom id.

### StreamDoctrineDbalStore

We offer a new experimental store called `StreamDoctrineDbalStore`.
This store is decoupled from the aggregate and can be used to store events from other sources.
The difference to the `DoctrineDbalStore` is that the `StreamDoctrineDbalStore` merge the aggregate id
and the aggregate name into one column named `stream`. Additionally, the column `playhead` is nullable.
This store introduces two new methods `streams` and `remove`.

The store needs a dbal connection, an event serializer and has some optional parameters like options.

```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;

/** @var Connection $connection */
$store = new StreamDoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
);
```
Following options are available in `StreamDoctrineDbalStore`:

| Option | Type | Default | Description |
|-------------------|-----------------|-------------|----------------------------------------------|
| table_name | string | event_store | The name of the table in the database |
| locking | bool | true | If the store should use locking for writing |
| lock_id | int | 133742 | The id of the lock |
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |

The table structure of the `StreamDoctrineDbalStore` looks like this:

| Column | Type | Description |
|------------------|----------|--------------------------------------------------|
| id | bigint | The index of the whole stream (autoincrement) |
| stream | string | The name of the stream |
| playhead | ?int | The current playhead of the aggregate |
| event | string | The name of the event |
| payload | json | The payload of the event |
| recorded_on | datetime | The date when the event was recorded |
| new_stream_start | bool | If the event is the first event of the aggregate |
| archived | bool | If the event is archived |
| custom_headers | json | Custom headers for the event |

## Schema

With the help of the `SchemaDirector`, the database structure can be created, updated and deleted.

!!! tip

You can also use doctrine migration to create and keep your schema in sync.
Expand All @@ -92,11 +139,11 @@ Additionally, it implements the `DryRunSchemaDirector` interface, to show the sq
```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Store\Store;

/**
* @var Connection $connection
* @var DoctrineDbalStore $store
* @var Store $store
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
Expand Down Expand Up @@ -179,13 +226,13 @@ use Doctrine\Migrations\DependencyFactory;
use Doctrine\Migrations\Provider\SchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineMigrationSchemaProvider;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Store\Store;

// event sourcing schema director configuration

/**
* @var Connection $connection
* @var DoctrineDbalStore $store
* @var Store $store
*/
$schemaDirector = new DoctrineSchemaDirector(
$connection,
Expand Down Expand Up @@ -355,11 +402,39 @@ $store->save(...$messages);

Use transactional method if you want call multiple save methods in a transaction.

### Delete & Update
### Update

It is not possible to delete or update events.
It is not possible to update events.
In event sourcing, the events are immutable.

### Remove

You can remove a stream with the `remove` method.

```php
use Patchlevel\EventSourcing\Store\StreamStore;

/** @var StreamStore $store */
$store->remove('profile-*');
```
!!! note

The method is only available in the `StreamStore` like `StreamDoctrineDbalStore`.

### List Streams

You can list all streams with the `streams` method.

```php
use Patchlevel\EventSourcing\Store\StreamStore;

/** @var StreamStore $store */
$streams = $store->streams(); // ['profile-1', 'profile-2', 'profile-3']
```
!!! note

The method is only available in the `StreamStore` like `StreamDoctrineDbalStore`.

### Transaction

There is also the possibility of executing a function in a transaction.
Expand Down
4 changes: 2 additions & 2 deletions phpbench.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"partition": "subject_name",
"cols":
{
"time-diff": "percent_diff(partition['result_time_avg'][1], partition['result_time_avg'][0])"
"time-diff": "percent_diff(coalesce(partition['result_time_avg']?[1], 0), coalesce(partition['result_time_avg']?[0], 0))"
}
},
"memory":
Expand All @@ -61,7 +61,7 @@
"partition": "subject_name",
"cols":
{
"memory-diff": "percent_diff(partition['result_mem_peak'][1], partition['result_mem_peak'][0])"
"memory-diff": "percent_diff(coalesce(partition['result_mem_peak']?[1], 0), coalesce(partition['result_mem_peak']?[0], 0))"
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ parameters:
count: 1
path: src/Store/DoctrineDbalStoreStream.php

-
message: "#^Parameter \\#2 \\$playhead of class Patchlevel\\\\EventSourcing\\\\Store\\\\StreamHeader constructor expects int\\<1, max\\>\\|null, int\\|null given\\.$#"
count: 1
path: src/Store/StreamDoctrineDbalStoreStream.php

-
message: "#^Ternary operator condition is always true\\.$#"
count: 1
path: src/Store/StreamDoctrineDbalStoreStream.php

-
message: "#^Parameter \\#3 \\$errorContext of class Patchlevel\\\\EventSourcing\\\\Subscription\\\\SubscriptionError constructor expects array\\<int, array\\{class\\: class\\-string, message\\: string, code\\: int\\|string, file\\: string, line\\: int, trace\\: array\\<int, array\\{file\\?\\: string, line\\?\\: int, function\\?\\: string, class\\?\\: string, type\\?\\: string, args\\?\\: array\\}\\>\\}\\>\\|null, mixed given\\.$#"
count: 1
Expand Down
5 changes: 5 additions & 0 deletions src/Aggregate/AggregateHeader.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ public function __construct(
public readonly DateTimeImmutable $recordedOn,
) {
}

public function streamName(): string
{
return StreamNameTranslator::streamName($this->aggregateName, $this->aggregateId);
}
}
18 changes: 18 additions & 0 deletions src/Aggregate/InvalidAggregateStreamName.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Aggregate;

use RuntimeException;

use function sprintf;

/** @experimental */
final class InvalidAggregateStreamName extends RuntimeException
{
public function __construct(string $stream)
{
parent::__construct(sprintf('Invalid aggregate stream name "%s". Expected format is "[aggregateName]-[aggregateId]".', $stream));
}
}
Loading

0 comments on commit c36ff32

Please sign in to comment.