Skip to content

Commit

Permalink
Refactor aggregate module (#424)
Browse files Browse the repository at this point in the history
* Simplify aggregate module
  • Loading branch information
dgafka authored Dec 28, 2024
1 parent 0d2a1aa commit d211546
Show file tree
Hide file tree
Showing 72 changed files with 1,645 additions and 2,308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ classesToResolve: [Calendar::class],
->sendCommand(new ScheduleMeeting('1', '2'))
;

self::assertFalse($ecotone->getMessageChannel('calendar')->receive()->getHeaders()->containsKey(AggregateMessage::CALLED_AGGREGATE_OBJECT));
self::assertFalse($ecotone->getMessageChannel('calendar')->receive()->getHeaders()->containsKey(AggregateMessage::CALLED_AGGREGATE_INSTANCE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
use Ecotone\Messaging\Handler\Gateway\Gateway;
use Ecotone\Messaging\MessageChannel;
use Ecotone\Messaging\MessagePublisher;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\DistributedBus;
use Ecotone\Modelling\EventBus;
use Ecotone\Modelling\EventSourcingExecutor\GroupedEventSourcingExecutor;
use Ecotone\Modelling\QueryBus;

/**
Expand Down Expand Up @@ -84,12 +86,18 @@ public function getFlowTestSupport(): FlowTestSupport
$this->getCommandBus(),
$this->getEventBus(),
$this->getQueryBus(),
$this->getServiceFromContainer(AggregateDefinitionRegistry::class),
$this->getMessagingTestSupport(),
$this->getGatewayByName(MessagingEntrypoint::class),
$this->configuredMessagingSystem
);
}

/**
* @template T
* @param class-string<T> $referenceName
* @return T
*/
public function getServiceFromContainer(string $referenceName): object
{
return $this->configuredMessagingSystem->getServiceFromContainer($referenceName);
Expand Down
21 changes: 16 additions & 5 deletions packages/Ecotone/src/Lite/Test/FlowTestSupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
use Ecotone\Messaging\Gateway\MessagingEntrypoint;
use Ecotone\Messaging\Handler\TypeDescriptor;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageChannel;
use Ecotone\Messaging\MessageHeaders;
Expand All @@ -19,13 +20,16 @@
use Ecotone\Messaging\Scheduling\TimeSpan;
use Ecotone\Messaging\Support\Assert;
use Ecotone\Messaging\Support\MessageBuilder;
use Ecotone\Modelling\AggregateFlow\SaveAggregate\AggregateResolver\AggregateDefinitionRegistry;
use Ecotone\Modelling\AggregateMessage;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\Config\MessageBusChannel;
use Ecotone\Modelling\Config\AggregrateHandlerModule;
use Ecotone\Modelling\Event;
use Ecotone\Modelling\EventBus;
use Ecotone\Modelling\EventSourcingExecutor\GroupedEventSourcingExecutor;
use Ecotone\Modelling\QueryBus;
use Ecotone\Test\StubEventSourcedAggregate;

/**
* @template T
Expand All @@ -39,6 +43,7 @@ public function __construct(
private CommandBus $commandBus,
private EventBus $eventBus,
private QueryBus $queryBus,
private AggregateDefinitionRegistry $aggregateDefinitionRegistry,
private MessagingTestSupport $testSupportGateway,
private MessagingEntrypoint $messagingEntrypoint,
private ConfiguredMessagingSystem $configuredMessagingSystem
Expand Down Expand Up @@ -179,13 +184,17 @@ public function getEventStreamEvents(string $streamName): array
*/
public function withEventsFor(string|object|array $identifiers, string $aggregateClass, array $events, int $aggregateVersion = 0): self
{
$aggregateDefinition = $this->aggregateDefinitionRegistry->getFor(TypeDescriptor::create($aggregateClass));
Assert::isTrue($aggregateDefinition->isEventSourced(), "Aggregate {$aggregateClass} is not event sourced. Can't store events for it.");

$this->messagingEntrypoint->sendWithHeaders(
$events,
[],
[
AggregateMessage::OVERRIDE_AGGREGATE_IDENTIFIER => is_object($identifiers) ? (string)$identifiers : $identifiers,
AggregateMessage::AGGREGATE_ID => is_object($identifiers) ? (string)$identifiers : $identifiers,
AggregateMessage::TARGET_VERSION => $aggregateVersion,
AggregateMessage::RESULT_AGGREGATE_OBJECT => $aggregateClass,
AggregateMessage::RESULT_AGGREGATE_EVENTS => $events,
AggregateMessage::CALLED_AGGREGATE_CLASS => $aggregateClass,
AggregateMessage::CALLED_AGGREGATE_INSTANCE => new $aggregateClass(),
AggregateMessage::RECORDED_AGGREGATE_EVENTS => $events,
],
AggregrateHandlerModule::getRegisterAggregateSaveRepositoryInputChannel($aggregateClass). '.test_setup_state'
);
Expand All @@ -198,7 +207,8 @@ public function withStateFor(object $aggregate): self
$this->messagingEntrypoint->sendWithHeaders(
$aggregate,
[
AggregateMessage::RESULT_AGGREGATE_OBJECT => $aggregate,
AggregateMessage::CALLED_AGGREGATE_INSTANCE => $aggregate,
AggregateMessage::CALLED_AGGREGATE_CLASS => $aggregate::class,
],
AggregrateHandlerModule::getRegisterAggregateSaveRepositoryInputChannel($aggregate::class). '.test_setup_state'
);
Expand Down Expand Up @@ -397,6 +407,7 @@ public function sendMessageDirectToChannelWithMessageReply(string $targetChannel
}

/**
* @template T
* @param class-string<T> $referenceName
* @return T
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public function getMessagePublisher(string $referenceName = MessagePublisher::cl

/**
* @throws InvalidArgumentException if trying to find not existing service reference
* @template T
* @param class-string<T> $referenceName
* @return T
*/
public function getServiceFromContainer(string $referenceName): object;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Ecotone\Messaging\Handler\Enricher;

use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Handler\ExpressionEvaluationService;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\Support\Assert;
Expand Down Expand Up @@ -35,6 +37,17 @@ public static function create(ExpressionEvaluationService $expressionEvaluationS
return self::createWithMapping($expressionEvaluationService, '');
}

public static function getDefinition(): Definition
{
return new Definition(
self::class,
[
Reference::to(ExpressionEvaluationService::REFERENCE)
],
[self::class, 'create']
);
}

/**
* @param PropertyPath $propertyNamePath
* @param mixed $dataToEnrich
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Ecotone\Messaging\Handler\Enricher;

use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\MessagingException;
use Ecotone\Messaging\Support\InvalidArgumentException;
use ReflectionClass;
Expand Down Expand Up @@ -39,6 +40,14 @@ public function hasPropertyValue(PropertyPath $propertyPath, $fromData): bool
return true;
}

public static function getDefinition(): Definition
{
return new Definition(
self::class,
[],
);
}

/**
* @param PropertyPath $propertyPath
* @param mixed $fromData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ class MethodInvokerAggregateObjectResolver implements MethodInvokerObjectResolve
{
public function resolveFor(Message $message): object
{
return $message->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_OBJECT);
return $message->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_INSTANCE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Ecotone\Messaging\Handler\Router;

use Ecotone\Messaging\Config\Container\DefinedObject;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Message;

/**
* Class HeaderValueRouter
* @package Ecotone\Messaging\Handler\Router
* @author Dariusz Gafka <[email protected]>
* @internal
*/
/**
* licence Apache-2.0
*/
final class HeaderExistsRouter implements RouteSelector, DefinedObject
{

private function __construct(private string $headerName, private string $routeToChannel, private string $fallbackRoute)
{
}

public static function create(string $headerName, string $routeToChannel, string $fallbackRoute): self
{
return new self($headerName, $routeToChannel, $fallbackRoute);
}

/**
* @param Message $message
* @return array
*/
public function route(Message $message): array
{
return $message->getHeaders()->containsKey($this->headerName)
? [$this->routeToChannel]
: [$this->fallbackRoute];
}

public function getDefinition(): Definition
{
return new Definition(self::class, [
$this->headerName,
$this->routeToChannel,
$this->fallbackRoute
], 'create');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ public static function createRecipientListRouter(array $recipientList): self
);
}

public static function createHeaderExistsRouter(string $headerName, string $routeToChannel, string $fallbackRoute): self
{
return new self(
HeaderExistsRouter::create($headerName, $routeToChannel, $fallbackRoute)->getDefinition(),
[
$routeToChannel => new Definition(SendToChannelProcessor::class, [
new ChannelReference($routeToChannel),
]),
$fallbackRoute => new Definition(SendToChannelProcessor::class, [
new ChannelReference($fallbackRoute),
]),
]
);
}

public function route(string $routeName, CompilableBuilder $processor): self
{
$this->routeMap[$routeName] = $processor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,7 @@ public function getParameterConverters(): array

public function compile(MessagingContainerBuilder $builder): Definition
{
if ($this->expression) {
$objectToInvokeOn = new Definition(ExpressionTransformer::class, [$this->expression, new Reference(ExpressionEvaluationService::REFERENCE), new Reference(ReferenceSearchService::class)]);
$interfaceToCallReference = new InterfaceToCallReference(ExpressionTransformer::class, 'transform');
} else {
$objectToInvokeOn = $this->directObject ?: new Reference($this->objectToInvokeReferenceName);
if ($this->methodNameOrInterface instanceof InterfaceToCall) {
$interfaceToCallReference = InterfaceToCallReference::fromInstance($this->methodNameOrInterface);
} else {
$className = $this->directObject ? \get_class($objectToInvokeOn) : $this->objectToInvokeReferenceName;
$interfaceToCallReference = new InterfaceToCallReference($className, $this->getMethodName());
}
}

$interfaceToCall = $builder->getInterfaceToCall($interfaceToCallReference);

if (! $interfaceToCall->canReturnValue()) {
throw InvalidArgumentException::create("Can't create transformer for {$interfaceToCall}, because method has no return value");
}
list($objectToInvokeOn, $interfaceToCallReference, $interfaceToCall) = $this->prepare($builder);

$newImplementation = MessageProcessorActivatorBuilder::create()
->withEndpointId($this->getEndpointId())
Expand All @@ -172,6 +155,17 @@ public function compile(MessagingContainerBuilder $builder): Definition
return $newImplementation->compile($builder);
}

public function compileProcessor(MessagingContainerBuilder $builder): Definition
{
list($objectToInvokeOn, $interfaceToCallReference, $interfaceToCall) = $this->prepare($builder);

return MethodInvokerBuilder::create(
$objectToInvokeOn,
$interfaceToCallReference,
$this->methodParameterConverterBuilders
)->compile($builder);
}

private function setDirectObjectToInvoke(DefinedObject $objectToInvoke): void
{
$this->directObject = $objectToInvoke;
Expand Down Expand Up @@ -202,4 +196,28 @@ private function getMethodName(): string|InterfaceToCall
? $this->methodNameOrInterface->getMethodName()
: $this->methodNameOrInterface;
}

public function prepare(MessagingContainerBuilder $builder): array
{
if ($this->expression) {
$objectToInvokeOn = new Definition(ExpressionTransformer::class, [$this->expression, new Reference(ExpressionEvaluationService::REFERENCE), new Reference(ReferenceSearchService::class)]);
$interfaceToCallReference = new InterfaceToCallReference(ExpressionTransformer::class, 'transform');
} else {
$objectToInvokeOn = $this->directObject ?: new Reference($this->objectToInvokeReferenceName);
if ($this->methodNameOrInterface instanceof InterfaceToCall) {
$interfaceToCallReference = InterfaceToCallReference::fromInstance($this->methodNameOrInterface);
} else {
$className = $this->directObject ? \get_class($objectToInvokeOn) : $this->objectToInvokeReferenceName;
$interfaceToCallReference = new InterfaceToCallReference($className, $this->getMethodName());
}
}

$interfaceToCall = $builder->getInterfaceToCall($interfaceToCallReference);

if (!$interfaceToCall->canReturnValue()) {
throw InvalidArgumentException::create("Can't create transformer for {$interfaceToCall}, because method has no return value");
}

return array($objectToInvokeOn, $interfaceToCallReference, $interfaceToCall);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Handler\Transformer;

use Ecotone\Messaging\Config\Container\CompilableBuilder;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;

/**
* licence Apache-2.0
*/
final class TransformerProcessorBuilder implements CompilableBuilder
{
private function __construct(private TransformerBuilder $transformerBuilder)
{

}

public static function create(TransformerBuilder $transformerBuilder): self
{
return new self($transformerBuilder);
}

public function compile(MessagingContainerBuilder $builder): Definition|Reference
{
return $this->transformerBuilder->compileProcessor($builder);
}
}
8 changes: 3 additions & 5 deletions packages/Ecotone/src/Messaging/MessageHeaders.php
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,10 @@ public static function unsetAggregateKeys(array $metadata): array
{
unset(
$metadata[AggregateMessage::AGGREGATE_ID],
$metadata[AggregateMessage::CALLED_AGGREGATE_OBJECT],
$metadata[AggregateMessage::CALLED_AGGREGATE_EVENTS],
$metadata[AggregateMessage::RESULT_AGGREGATE_OBJECT],
$metadata[AggregateMessage::RESULT_AGGREGATE_EVENTS],
$metadata[AggregateMessage::CALLED_AGGREGATE_INSTANCE],
$metadata[AggregateMessage::CALLED_AGGREGATE_CLASS],
$metadata[AggregateMessage::RECORDED_AGGREGATE_EVENTS],
$metadata[AggregateMessage::TARGET_VERSION],
$metadata[AggregateMessage::AGGREGATE_OBJECT_EXISTS],
$metadata[AggregateMessage::NULL_EXECUTION_RESULT],
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public function convertToMessage(Message $requestMessage, mixed $result): ?Messa
}

if ($this->isCommandHandler) {
$calledAggregate = $requestMessage->getHeaders()->containsKey(AggregateMessage::CALLED_AGGREGATE_OBJECT) ? $requestMessage->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_OBJECT) : null;
$calledAggregate = $requestMessage->getHeaders()->containsKey(AggregateMessage::CALLED_AGGREGATE_INSTANCE) ? $requestMessage->getHeaders()->get(AggregateMessage::CALLED_AGGREGATE_INSTANCE) : null;
$versionBeforeHandling = $requestMessage->getHeaders()->containsKey(AggregateMessage::TARGET_VERSION) ? $requestMessage->getHeaders()->get(AggregateMessage::TARGET_VERSION) : null;

if (is_null($versionBeforeHandling) && $this->aggregateVersionProperty) {
Expand All @@ -51,7 +51,6 @@ public function convertToMessage(Message $requestMessage, mixed $result): ?Messa

$resultMessage = $resultMessage->setHeader(AggregateMessage::TARGET_VERSION, $versionBeforeHandling);
}
$resultMessage = $resultMessage->setHeader(AggregateMessage::CALLED_AGGREGATE_OBJECT, $calledAggregate);
}

if (! is_null($result)) {
Expand All @@ -68,6 +67,7 @@ public function convertToMessage(Message $requestMessage, mixed $result): ?Messa
if ($this->isCommandHandler || ! is_null($result)) {
return $resultMessage->build();
}

return null;
}
}
Loading

0 comments on commit d211546

Please sign in to comment.