Skip to content

Commit

Permalink
Testing Event Sourcing applications (#66)
Browse files Browse the repository at this point in the history
* Testing Event Sourcing applications

* clean up

* set up events

* Fix tests

* fix tests
  • Loading branch information
dgafka authored Dec 15, 2022
1 parent 44bb8a3 commit a65fbda
Show file tree
Hide file tree
Showing 27 changed files with 332 additions and 99 deletions.
2 changes: 2 additions & 0 deletions .env.dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
XDEBUG_ENABLED="0"
PHP_IDE_CONFIG="serverName=ecotone"
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ composer.lock
.php-cs-fixer.cache
cache
auth.json
config.json
config.json
.env
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Read more on the [Blog](https://blog.ecotone.tech).

# Development

Start docker containers
Copy `.env.dist` to `.env` and start docker containers

```php
docker-compose up -d
Expand All @@ -44,6 +44,13 @@ Clear environment
docker-compose down
```

Debugging code with Xdebug.
To have enabled Xdebug all the time, change line in your .env file to `XDEBUG_ENABLED="1"`
To enable xdebug conditionally for given test run:
```php
docker exec -it ecotone_development xdebug vendor/bin/phpunit --filter test_calling_command_on_aggregate_and_receiving_aggregate_instance
```

## Feature requests and issue reporting

Use [issue tracking system](https://github.com/ecotoneframework/ecotone/issues) for new feature request and bugs.
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ services:
RABBIT_HOST: "amqp://rabbitmq:5672"
DATABASE_DSN: pgsql://ecotone:secret@database:5432/ecotone
DATABASE_MYSQL: mysql://ecotone:secret@database-mysql:3306/ecotone
env_file:
- ".env"
database:
image: postgres:12.1
networks:
Expand Down
11 changes: 8 additions & 3 deletions packages/Ecotone/src/Lite/EcotoneLite.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
use Ecotone\Messaging\InMemoryConfigurationVariableService;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Modelling\Attribute\Aggregate;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Modelling\BaseEventSourcingConfiguration;
use Ecotone\Modelling\Config\RegisterAggregateRepositoryChannels;
use Psr\Container\ContainerInterface;

final class EcotoneLite
Expand Down Expand Up @@ -211,11 +213,14 @@ private static function prepareForFlowTesting(?ServiceConfiguration $configurati
->withSkippedModulePackageNames($packagesToSkip);
}

$aggregate = TypeDescriptor::create(Aggregate::class);
$aggregateAnnotation = TypeDescriptor::create(Aggregate::class);
foreach ($classesToResolve as $class) {
if (ClassDefinition::createFor(TypeDescriptor::create($class))->hasClassAnnotation($aggregate)) {
$testConfiguration = $testConfiguration->addAggregateUnderTest($class);
$aggregateClass = ClassDefinition::createFor(TypeDescriptor::create($class));
if (!$aggregateClass->hasClassAnnotation($aggregateAnnotation)) {
continue;
}

$configuration = $configuration->addExtensionObject(new RegisterAggregateRepositoryChannels($aggregateClass->getClassType()->toString(), $aggregateClass->getSingleClassAnnotation($aggregateAnnotation) instanceof EventSourcingAggregate));
}

$configuration = $configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
use Ecotone\Messaging\Handler\ServiceActivator\ServiceActivatorBuilder;
use Ecotone\Messaging\Precedence;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\Config\RegisterLoadAggregateChannel;
use Ecotone\Modelling\Config\RegisterAggregateRepositoryChannels;
use Ecotone\Modelling\EventBus;
use Ecotone\Modelling\QueryBus;

Expand Down Expand Up @@ -283,16 +283,4 @@ private function registerMessageCollector(MessageCollectorHandler $messageCollec
QueryBus::class
));
}

public function getModuleExtensions(array $serviceExtensions): array
{
$testConfiguration = ExtensionObjectResolver::resolveUnique(TestConfiguration::class, $serviceExtensions, TestConfiguration::createWithDefaults());

$aggregatesToLoad = [];
foreach ($testConfiguration->getAggregatesAndSagasUnderTest() as $aggregateOrSaga) {
$aggregatesToLoad[] = new RegisterLoadAggregateChannel($aggregateOrSaga);
}

return $aggregatesToLoad;
}
}
37 changes: 36 additions & 1 deletion packages/Ecotone/src/Lite/Test/FlowTestSupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Ecotone\Modelling\Event;
use Ecotone\Modelling\EventBus;
use Ecotone\Modelling\QueryBus;
use Ramsey\Uuid\UuidInterface;

/**
* @template T
Expand Down Expand Up @@ -99,13 +100,47 @@ public function run(string $name, ?ExecutionPollingMetadata $executionPollingMet
/**
* @param Event[]|object[]|array[] $streamEvents
*/
public function appendToEventStore(string $streamName, array $events): self
public function withEventStream(string $streamName, array $events): self
{
$this->getGateway(EventStore::class)->appendTo($streamName, $events);

return $this;
}

/**
* @param Event[]|object[]|array[] $streamEvents
*/
public function withEventsFor(string|object|array $identifiers, string $aggregateClass, array $events, int $aggregateVersion = 0): self
{
$this->messagingEntrypoint->sendWithHeaders(
$events,
[
AggregateMessage::OVERRIDE_AGGREGATE_IDENTIFIER => is_object($identifiers) ? (string)$identifiers : $identifiers,
AggregateMessage::TARGET_VERSION => $aggregateVersion,
AggregateMessage::AGGREGATE_OBJECT => $aggregateClass
],
ModellingHandlerModule::getRegisterAggregateSaveRepositoryInputChannel($aggregateClass)
);

return $this;
}

/**
* @param Event[]|object[]|array[] $streamEvents
*/
public function withStateFor(object $aggregate): self
{
$this->messagingEntrypoint->sendWithHeaders(
$aggregate,
[
AggregateMessage::AGGREGATE_OBJECT => $aggregate
],
ModellingHandlerModule::getRegisterAggregateSaveRepositoryInputChannel($aggregate::class)
);

return $this;
}

public function triggerProjection(string $projectionName): self
{
$this->getGateway(ProjectionManager::class)->triggerProjection($projectionName);
Expand Down
29 changes: 12 additions & 17 deletions packages/Ecotone/src/Lite/Test/TestConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@
namespace Ecotone\Lite\Test;

use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\ClassDefinition;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Modelling\Attribute\Aggregate;
use Ecotone\Modelling\Attribute\EventSourcingAggregate;
use Ecotone\Modelling\Config\RegisterAggregateRepositoryChannels;

final class TestConfiguration
{
/**
* @param string[] $spiedChannelNames
* @param RegisterAggregateRepositoryChannels[] $relatedAggregates
*/
private function __construct(
private bool $failOnCommandHandlerNotFound,
private bool $failOnQueryHandlerNotFound,
Expand Down Expand Up @@ -63,23 +72,6 @@ public function withSpyOnChannel(string $channelName): self
return $self;
}

public function addAggregateUnderTest(string $aggregateClassName): self
{
if (in_array($aggregateClassName, $this->relatedAggregates)) {
return $this;
}

$self = clone $this;
$self->relatedAggregates[] = $aggregateClassName;

return $self;
}

public function addSagaUnderTest(string $sagaClassName): self
{
return $this->addAggregateUnderTest($sagaClassName);
}

public function isFailingOnCommandHandlerNotFound(): bool
{
return $this->failOnCommandHandlerNotFound;
Expand All @@ -105,6 +97,9 @@ public function getSpiedChannels(): array
return $this->spiedChannelNames;
}

/**
* @return RegisterAggregateRepositoryChannels[]
*/
public function getAggregatesAndSagasUnderTest(): array
{
return $this->relatedAggregates;
Expand Down
13 changes: 12 additions & 1 deletion packages/Ecotone/src/Messaging/Handler/ClassDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public function getSingleClassAnnotation(TypeDescriptor $annotationType): object
{
$foundAnnotations = [];
foreach ($this->classAnnotations as $classAnnotation) {
if (TypeDescriptor::createFromVariable($classAnnotation)->equals($annotationType)) {
if (TypeDescriptor::createFromVariable($classAnnotation)->isCompatibleWith($annotationType)) {
$foundAnnotations[] = $classAnnotation;
}
}
Expand All @@ -177,6 +177,17 @@ public function isAnnotation(): bool
}

public function hasClassAnnotation(Type $type): bool
{
foreach ($this->getClassAnnotations() as $classAnnotation) {
if (TypeDescriptor::createFromVariable($classAnnotation)->isCompatibleWith($type)) {
return true;
}
}

return false;
}

public function hasClassAnnotationOfPreciseType(Type $type): bool
{
foreach ($this->getClassAnnotations() as $classAnnotation) {
if (TypeDescriptor::createFromVariable($classAnnotation)->equals($type)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static function createPrivate(string $name, Type $typeDescriptor, bool $i
public function hasAnnotation(Type $annotationClass): bool
{
foreach ($this->annotations as $annotation) {
if (TypeDescriptor::createFromVariable($annotation)->equals($annotationClass)) {
if (TypeDescriptor::createFromVariable($annotation)->isCompatibleWith($annotationClass)) {
return true;
}
}
Expand Down
52 changes: 35 additions & 17 deletions packages/Ecotone/src/Modelling/Config/ModellingHandlerModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public function canHandle($extensionObject): bool
||
$extensionObject instanceof BaseEventSourcingConfiguration
||
$extensionObject instanceof RegisterLoadAggregateChannel;
$extensionObject instanceof RegisterAggregateRepositoryChannels;
}

public function getModuleExtensions(array $serviceExtensions): array
Expand Down Expand Up @@ -374,18 +374,10 @@ public function prepare(Configuration $configuration, array $moduleExtensions, M
$gatewayParameterConverters = [GatewayHeaderBuilder::create($interface->getFirstParameter()->getName(), AggregateMessage::AGGREGATE_OBJECT)];
}


/** @TODO do not require method name in save service */
$methodName = $aggregateClassDefinition->getPublicMethodNames() ? $aggregateClassDefinition->getPublicMethodNames()[0] : '__construct';
$configuration->registerMessageHandler(
$chainMessageHandlerBuilder
->chain(SaveAggregateServiceBuilder::create($aggregateClassDefinition, $methodName, $interfaceToCallRegistry, $baseEventSourcingConfiguration->getSnapshotTriggerThreshold(), $baseEventSourcingConfiguration->getSnapshotsAggregateClasses(), $baseEventSourcingConfiguration->getDocumentStoreReference())
->withInputChannelName($inputChannelName)
->withAggregateRepositoryFactories($this->aggregateRepositoryReferenceNames))
);
$this->registerSaveAggregate($aggregateClassDefinition, $configuration, $chainMessageHandlerBuilder, $interfaceToCallRegistry, $baseEventSourcingConfiguration, $inputChannelName);
} else {
Assert::isTrue($interface->hasFirstParameter(), 'Fetchting repository should have at least one parameter for identifiers: ' . $repositoryGateway);
$this->registerLoadAggregateChannel(
$this->registerLoadAggregate(
$interfaceToCallRegistry->getClassDefinitionFor($interface->getReturnType()),
$interface->canItReturnNull(),
$configuration,
Expand All @@ -406,16 +398,24 @@ public function prepare(Configuration $configuration, array $moduleExtensions, M
);
}

/** @var RegisterLoadAggregateChannel $registerLoadAggregateChannel */
foreach (ExtensionObjectResolver::resolve(RegisterLoadAggregateChannel::class, $moduleExtensions) as $registerLoadAggregateChannel) {
$this->registerLoadAggregateChannel(
$interfaceToCallRegistry->getClassDefinitionFor(TypeDescriptor::create($registerLoadAggregateChannel->getClassName())),
/** @var RegisterAggregateRepositoryChannels $registerAggregateChannel */
foreach (ExtensionObjectResolver::resolve(RegisterAggregateRepositoryChannels::class, $moduleExtensions) as $registerAggregateChannel) {
$aggregateClassDefinition = $interfaceToCallRegistry->getClassDefinitionFor(TypeDescriptor::create($registerAggregateChannel->getClassName()));

$this->registerLoadAggregate(
$aggregateClassDefinition,
false,
$configuration,
ChainMessageHandlerBuilder::create()
->withInputChannelName(self::getRegisterAggregateLoadRepositoryInputChannel($registerLoadAggregateChannel->getClassName())),
->withInputChannelName(self::getRegisterAggregateLoadRepositoryInputChannel($registerAggregateChannel->getClassName())),
$interfaceToCallRegistry
);

$this->registerSaveAggregate($aggregateClassDefinition, $configuration,
ChainMessageHandlerBuilder::create()
->withInputChannelName(self::getRegisterAggregateSaveRepositoryInputChannel($registerAggregateChannel->getClassName()))
->chain(AggregateIdentifierRetrevingServiceBuilder::createWith($aggregateClassDefinition, [], null, $interfaceToCallRegistry)),
$interfaceToCallRegistry, $baseEventSourcingConfiguration, self::getRegisterAggregateSaveRepositoryInputChannel($registerAggregateChannel->getClassName()));
}

$aggregateCommandOrEventHandlers = [];
Expand Down Expand Up @@ -644,12 +644,17 @@ public static function getRegisterAggregateLoadRepositoryInputChannel(string $cl
return self::getAggregateRepositoryInputChannel($className, 'will_load' . $className, false, false);
}

public static function getRegisterAggregateSaveRepositoryInputChannel(string $className): string
{
return self::getAggregateRepositoryInputChannel($className, 'will_save' . $className, false, false);
}

public static function getAggregateRepositoryInputChannel(string $className, string $methodName1, bool $isSave, bool $canReturnNull): string
{
return $className . $methodName1 . ($isSave ? '.save' : '.load' . ($canReturnNull ? '.nullable' : ''));
}

private function registerLoadAggregateChannel(ClassDefinition $aggregateClassDefinition, bool $canReturnNull, Configuration $configuration, ChainMessageHandlerBuilder $chainMessageHandlerBuilder, InterfaceToCallRegistry $interfaceToCallRegistry): void
private function registerLoadAggregate(ClassDefinition $aggregateClassDefinition, bool $canReturnNull, Configuration $configuration, ChainMessageHandlerBuilder $chainMessageHandlerBuilder, InterfaceToCallRegistry $interfaceToCallRegistry): void
{
/** @TODO do not require method name in save service */
$methodName = $aggregateClassDefinition->getPublicMethodNames() ? $aggregateClassDefinition->getPublicMethodNames()[0] : '__construct';
Expand All @@ -669,4 +674,17 @@ private function registerLoadAggregateChannel(ClassDefinition $aggregateClassDef
)
);
}

private function registerSaveAggregate(ClassDefinition $aggregateClassDefinition, Configuration $configuration, ChainMessageHandlerBuilder|\Ecotone\Messaging\Handler\InputOutputMessageHandlerBuilder $chainMessageHandlerBuilder, InterfaceToCallRegistry $interfaceToCallRegistry, BaseEventSourcingConfiguration $baseEventSourcingConfiguration, string $inputChannelName): void
{
/** @TODO do not require method name in save service */
$methodName = $aggregateClassDefinition->getPublicMethodNames() ? $aggregateClassDefinition->getPublicMethodNames()[0] : '__construct';
$configuration->registerMessageHandler(
$chainMessageHandlerBuilder
->chain(
SaveAggregateServiceBuilder::create($aggregateClassDefinition, $methodName, $interfaceToCallRegistry, $baseEventSourcingConfiguration->getSnapshotTriggerThreshold(), $baseEventSourcingConfiguration->getSnapshotsAggregateClasses(), $baseEventSourcingConfiguration->getDocumentStoreReference())
->withAggregateRepositoryFactories($this->aggregateRepositoryReferenceNames)
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

declare(strict_types=1);

namespace Ecotone\Modelling\Config;

final class RegisterAggregateRepositoryChannels
{
public function __construct(private string $className, private bool $isEventSourced)
{
}

public function getClassName(): string
{
return $this->className;
}

public function isEventSourced(): bool
{
return $this->isEventSourced;
}
}

This file was deleted.

Loading

0 comments on commit a65fbda

Please sign in to comment.