Skip to content

Commit

Permalink
add subscription setup cli commands & skip setup in subscription run
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Jul 31, 2024
1 parent fe23cbc commit 55e10ec
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 1 deletion.
47 changes: 47 additions & 0 deletions src/Console/Command/SchemaSubscriptionSetupCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\OutputStyle;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\SubscriptionStore;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(
'event-sourcing:schema:subscription-setup',
'setup subscription (pub/sub) for store',
)]
final class SchemaSubscriptionSetupCommand extends Command
{
public function __construct(
private readonly Store $store,
) {
parent::__construct();
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new OutputStyle($input, $output);

if (!$this->store instanceof SubscriptionStore) {
$io->error('store does not support subscriptions');

return 1;
}

if (!$this->store->supportSubscription()) {
$io->error('store does not support subscriptions');

return 1;
}

$this->store->setupSubscription();

return 0;
}
}
55 changes: 55 additions & 0 deletions src/Console/Command/SchemaSubscriptionTeardownCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\OutputStyle;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\SubscriptionStore;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

use function method_exists;

#[AsCommand(
'event-sourcing:schema:subscription-teardown',
'teardown subscription (pub/sub) for store',
)]
final class SchemaSubscriptionTeardownCommand extends Command
{
public function __construct(
private readonly Store $store,
) {
parent::__construct();
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new OutputStyle($input, $output);

if (!$this->store instanceof SubscriptionStore) {
$io->error('store does not support subscriptions');

return 1;
}

if (!$this->store->supportSubscription()) {
$io->error('store does not support subscriptions');

return 1;
}

if (method_exists($this->store, 'teardownSubscription')) {
$this->store->teardownSubscription();

return 0;
}

$io->error('store does not support teardownSubscription');

return 1;
}
}
9 changes: 8 additions & 1 deletion src/Console/Command/SubscriptionRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ protected function configure(): void
null,
InputOption::VALUE_NONE,
'rebuild (remove & boot) subscriptions before run',
)
->addOption(
'skip-subscription-setup',
null,
InputOption::VALUE_NONE,
'skip subscription setup',
);
}

Expand All @@ -81,11 +87,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$timeLimit = InputHelper::nullablePositiveInt($input->getOption('time-limit'));
$sleep = InputHelper::positiveIntOrZero($input->getOption('sleep'));
$rebuild = InputHelper::bool($input->getOption('rebuild'));
$skipSubscriptionSetup = InputHelper::bool($input->getOption('skip-subscription-setup'));

$criteria = $this->subscriptionEngineCriteria($input);
$criteria = $this->resolveCriteriaIntoCriteriaWithOnlyIds($criteria);

if ($this->store instanceof SubscriptionStore) {
if ($this->store instanceof SubscriptionStore && !$skipSubscriptionSetup) {
$this->store->setupSubscription();
}

Expand Down
19 changes: 19 additions & 0 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,25 @@ public function setupSubscription(): void
));
}

public function teardownSubscription(): void
{
if (!$this->supportSubscription()) {
return;
}

$functionName = $this->createTriggerFunctionName();

$this->connection->executeStatement(sprintf(
'DROP FUNCTION IF EXISTS %s() CASCADE;',
$functionName,
));

$this->connection->executeStatement(sprintf(
'DROP TRIGGER IF EXISTS notify_trigger ON %s;',
$this->config['table_name'],
));
}

private function createTriggerFunctionName(): string
{
$tableConfig = explode('.', $this->config['table_name']);
Expand Down
19 changes: 19 additions & 0 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,25 @@ public function setupSubscription(): void
));
}

public function teardownSubscription(): void
{
if (!$this->supportSubscription()) {
return;
}

$functionName = $this->createTriggerFunctionName();

$this->connection->executeStatement(sprintf(
'DROP FUNCTION IF EXISTS %s() CASCADE;',
$functionName,
));

$this->connection->executeStatement(sprintf(
'DROP TRIGGER IF EXISTS notify_trigger ON %s;',
$this->config['table_name'],
));
}

private function createTriggerFunctionName(): string
{
$tableConfig = explode('.', $this->config['table_name']);
Expand Down
2 changes: 2 additions & 0 deletions src/Store/SubscriptionStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ public function supportSubscription(): bool;
public function setupSubscription(): void;

public function wait(int $timeoutMilliseconds): void;

// public function teardownSubscription(): void;
}

0 comments on commit 55e10ec

Please sign in to comment.