From a17ad9c2e545114efc3fd0d739b3109dc6d4db7e Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Fri, 10 Apr 2026 08:42:28 +0200 Subject: [PATCH] Add dedicated Illuminate based store implementations for the event store, subscription and cryptography. --- .github/workflows/integration.yml | 216 +++ .github/workflows/{phpunit.yml => unit.yml} | 2 +- Makefile | 3 +- compose.yaml | 14 + composer.json | 2 +- composer.lock | 2 +- .../create_eventsourcing_tables.php | 34 +- phpunit.xml.dist | 4 + src/Cryptography/IlluminateCipherKeyStore.php | 96 ++ src/Store/StreamIlluminateStore.php | 480 ++++++ src/Store/StreamIlluminateStoreStream.php | 206 +++ .../Cleanup/IlluminateCleanupTaskHandler.php | 82 + .../Store/IlluminateSubscriptionStore.php | 263 +++ tests/DatabaseManager.php | 100 ++ .../BankAccountSplitStream/AccountId.php | 13 + .../BankAccountSplitStream/BankAccount.php | 78 + .../BankAccountIntegrationTest.php | 273 +++ .../Events/BalanceAdded.php | 19 + .../Events/BankAccountCreated.php | 18 + .../Events/MonthPassed.php | 21 + .../Projection/BankAccountProjector.php | 69 + .../BasicIntegrationTest.php | 389 +++++ .../Command/AdjustStockForProduct.php | 20 + .../Command/ChangeProfileName.php | 18 + .../Command/CreateProfile.php | 16 + .../Command/DecreaseStockForProduct.php | 20 + .../Events/NameChanged.php | 18 + .../Events/ProfileCreated.php | 18 + .../Events/StockAdjusted.php | 20 + .../Events/StockCreated.php | 17 + .../Events/StockDecreased.php | 20 + .../BasicImplementation/Header/BazHeader.php | 17 + .../BasicImplementation/Header/FooHeader.php | 17 + .../MessageDecorator/FooMessageDecorator.php | 18 + .../Processor/SendEmailProcessor.php | 21 + .../BasicImplementation/ProductId.php | 13 + .../BasicImplementation/Profile.php | 53 + .../BasicImplementation/ProfileId.php | 13 + .../ProfileWithCommands.php | 68 + .../Projection/ProfileProjector.php | 74 + .../Query/QueryProfileName.php | 14 + .../BasicImplementation/SendEmailMock.php | 25 + .../Integration/BasicImplementation/Stock.php | 77 + .../BasicImplementation/StockId.php | 18 + .../ChildAggregateIntegrationTest.php | 144 ++ .../ChildAggregate/Events/NameChanged.php | 18 + .../ChildAggregate/Events/ProfileCreated.php | 18 + .../ChildAggregate/Events/ViewTracked.php | 15 + .../ChildAggregate/PersonalInformation.php | 34 + tests/Integration/ChildAggregate/Profile.php | 63 + .../Integration/ChildAggregate/ProfileId.php | 13 + .../Projection/ProfileProjector.php | 63 + .../ChildAggregate/SendEmailMock.php | 25 + tests/Integration/ChildAggregate/Views.php | 32 + tests/Integration/IntegrationTestCase.php | 41 + .../MicroAggregate/Events/NameChanged.php | 18 + .../MicroAggregate/Events/ProfileCreated.php | 18 + .../MicroAggregateIntegrationTest.php | 153 ++ .../MicroAggregate/PersonalInformation.php | 48 + tests/Integration/MicroAggregate/Profile.php | 36 + .../Integration/MicroAggregate/ProfileId.php | 13 + .../Projection/ProfileProjector.php | 63 + .../MicroAggregate/SendEmailMock.php | 25 + .../PersonalData/Events/NameChanged.php | 22 + .../Events/PersonalDataRemoved.php | 16 + .../PersonalData/Events/ProfileCreated.php | 22 + .../PersonalData/PersonalDataTest.php | 191 +++ .../Processor/DeletePersonalDataProcessor.php | 25 + tests/Integration/PersonalData/Profile.php | 70 + tests/Integration/PersonalData/ProfileId.php | 13 + .../Integration/Store/Events/ExternEvent.php | 16 + .../Store/Events/ProfileCreated.php | 18 + tests/Integration/Store/Profile.php | 40 + tests/Integration/Store/ProfileId.php | 13 + .../Store/StreamIlluminateStoreTest.php | 524 ++++++ .../Subscription/Events/AdminPromoted.php | 17 + .../Subscription/Events/NameChanged.php | 18 + .../Subscription/Events/ProfileCreated.php | 18 + tests/Integration/Subscription/Profile.php | 71 + tests/Integration/Subscription/ProfileId.php | 13 + .../Subscriber/ErrorProducerSubscriber.php | 44 + ...rrorProducerWithSelfRecoverySubscriber.php | 64 + .../Subscriber/LookupSubscriber.php | 83 + ...igrateAggregateToStreamStoreSubscriber.php | 93 + .../Subscriber/ProfileNewProjection.php | 58 + .../Subscriber/ProfileProcessor.php | 50 + .../Subscriber/ProfileProjection.php | 79 + .../ProfileProjectionWithCleanup.php | 80 + .../Subscription/SubscriptionTest.php | 1511 +++++++++++++++++ 89 files changed, 6968 insertions(+), 20 deletions(-) create mode 100644 .github/workflows/integration.yml rename .github/workflows/{phpunit.yml => unit.yml} (92%) create mode 100644 compose.yaml create mode 100644 src/Cryptography/IlluminateCipherKeyStore.php create mode 100644 src/Store/StreamIlluminateStore.php create mode 100644 src/Store/StreamIlluminateStoreStream.php create mode 100644 src/Subscription/Cleanup/IlluminateCleanupTaskHandler.php create mode 100644 src/Subscription/Store/IlluminateSubscriptionStore.php create mode 100644 tests/DatabaseManager.php create mode 100644 tests/Integration/BankAccountSplitStream/AccountId.php create mode 100644 tests/Integration/BankAccountSplitStream/BankAccount.php create mode 100644 tests/Integration/BankAccountSplitStream/BankAccountIntegrationTest.php create mode 100644 tests/Integration/BankAccountSplitStream/Events/BalanceAdded.php create mode 100644 tests/Integration/BankAccountSplitStream/Events/BankAccountCreated.php create mode 100644 tests/Integration/BankAccountSplitStream/Events/MonthPassed.php create mode 100644 tests/Integration/BankAccountSplitStream/Projection/BankAccountProjector.php create mode 100644 tests/Integration/BasicImplementation/BasicIntegrationTest.php create mode 100644 tests/Integration/BasicImplementation/Command/AdjustStockForProduct.php create mode 100644 tests/Integration/BasicImplementation/Command/ChangeProfileName.php create mode 100644 tests/Integration/BasicImplementation/Command/CreateProfile.php create mode 100644 tests/Integration/BasicImplementation/Command/DecreaseStockForProduct.php create mode 100644 tests/Integration/BasicImplementation/Events/NameChanged.php create mode 100644 tests/Integration/BasicImplementation/Events/ProfileCreated.php create mode 100644 tests/Integration/BasicImplementation/Events/StockAdjusted.php create mode 100644 tests/Integration/BasicImplementation/Events/StockCreated.php create mode 100644 tests/Integration/BasicImplementation/Events/StockDecreased.php create mode 100644 tests/Integration/BasicImplementation/Header/BazHeader.php create mode 100644 tests/Integration/BasicImplementation/Header/FooHeader.php create mode 100644 tests/Integration/BasicImplementation/MessageDecorator/FooMessageDecorator.php create mode 100644 tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php create mode 100644 tests/Integration/BasicImplementation/ProductId.php create mode 100644 tests/Integration/BasicImplementation/Profile.php create mode 100644 tests/Integration/BasicImplementation/ProfileId.php create mode 100644 tests/Integration/BasicImplementation/ProfileWithCommands.php create mode 100644 tests/Integration/BasicImplementation/Projection/ProfileProjector.php create mode 100644 tests/Integration/BasicImplementation/Query/QueryProfileName.php create mode 100644 tests/Integration/BasicImplementation/SendEmailMock.php create mode 100644 tests/Integration/BasicImplementation/Stock.php create mode 100644 tests/Integration/BasicImplementation/StockId.php create mode 100644 tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php create mode 100644 tests/Integration/ChildAggregate/Events/NameChanged.php create mode 100644 tests/Integration/ChildAggregate/Events/ProfileCreated.php create mode 100644 tests/Integration/ChildAggregate/Events/ViewTracked.php create mode 100644 tests/Integration/ChildAggregate/PersonalInformation.php create mode 100644 tests/Integration/ChildAggregate/Profile.php create mode 100644 tests/Integration/ChildAggregate/ProfileId.php create mode 100644 tests/Integration/ChildAggregate/Projection/ProfileProjector.php create mode 100644 tests/Integration/ChildAggregate/SendEmailMock.php create mode 100644 tests/Integration/ChildAggregate/Views.php create mode 100644 tests/Integration/IntegrationTestCase.php create mode 100644 tests/Integration/MicroAggregate/Events/NameChanged.php create mode 100644 tests/Integration/MicroAggregate/Events/ProfileCreated.php create mode 100644 tests/Integration/MicroAggregate/MicroAggregateIntegrationTest.php create mode 100644 tests/Integration/MicroAggregate/PersonalInformation.php create mode 100644 tests/Integration/MicroAggregate/Profile.php create mode 100644 tests/Integration/MicroAggregate/ProfileId.php create mode 100644 tests/Integration/MicroAggregate/Projection/ProfileProjector.php create mode 100644 tests/Integration/MicroAggregate/SendEmailMock.php create mode 100644 tests/Integration/PersonalData/Events/NameChanged.php create mode 100644 tests/Integration/PersonalData/Events/PersonalDataRemoved.php create mode 100644 tests/Integration/PersonalData/Events/ProfileCreated.php create mode 100644 tests/Integration/PersonalData/PersonalDataTest.php create mode 100644 tests/Integration/PersonalData/Processor/DeletePersonalDataProcessor.php create mode 100644 tests/Integration/PersonalData/Profile.php create mode 100644 tests/Integration/PersonalData/ProfileId.php create mode 100644 tests/Integration/Store/Events/ExternEvent.php create mode 100644 tests/Integration/Store/Events/ProfileCreated.php create mode 100644 tests/Integration/Store/Profile.php create mode 100644 tests/Integration/Store/ProfileId.php create mode 100644 tests/Integration/Store/StreamIlluminateStoreTest.php create mode 100644 tests/Integration/Subscription/Events/AdminPromoted.php create mode 100644 tests/Integration/Subscription/Events/NameChanged.php create mode 100644 tests/Integration/Subscription/Events/ProfileCreated.php create mode 100644 tests/Integration/Subscription/Profile.php create mode 100644 tests/Integration/Subscription/ProfileId.php create mode 100644 tests/Integration/Subscription/Subscriber/ErrorProducerSubscriber.php create mode 100644 tests/Integration/Subscription/Subscriber/ErrorProducerWithSelfRecoverySubscriber.php create mode 100644 tests/Integration/Subscription/Subscriber/LookupSubscriber.php create mode 100644 tests/Integration/Subscription/Subscriber/MigrateAggregateToStreamStoreSubscriber.php create mode 100644 tests/Integration/Subscription/Subscriber/ProfileNewProjection.php create mode 100644 tests/Integration/Subscription/Subscriber/ProfileProcessor.php create mode 100644 tests/Integration/Subscription/Subscriber/ProfileProjection.php create mode 100644 tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php create mode 100644 tests/Integration/Subscription/SubscriptionTest.php diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml new file mode 100644 index 0000000..9344198 --- /dev/null +++ b/.github/workflows/integration.yml @@ -0,0 +1,216 @@ +# https://help.github.com/en/categories/automating-your-workflow-with-github-actions + +name: "Integration tests" + +on: + pull_request: + push: + branches: + - "[0-9]+.[0-9]+.x" + - "renovate/*" + +jobs: + postgres: + name: "Postgres" + + runs-on: ${{ matrix.operating-system }} + + services: + postgres: + # Docker Hub image + image: "postgres:${{ matrix.postgres-version }}" + # Provide the password for postgres + env: + POSTGRES_PASSWORD: postgres + POSTGRES_DB: event_store + options: >- + --health-cmd "pg_isready" + ports: + - "5432:5432" + + strategy: + matrix: + dependencies: + - "locked" + php-version: + - "8.5" + operating-system: + - "ubuntu-latest" + postgres-version: + - "14.20" + - "15.15" + - "16.11" + - "17.7" + - "18.1" + + env: + DB_URL: 'pgsql://postgres:postgres@localhost:5432/event_store?charset=utf8' + DB_CONNECTION: 'pgsql' + DB_DATABASE: 'event_store' + + steps: + - name: "Checkout" + uses: actions/checkout@v6 + + - name: "Install PHP" + uses: "shivammathur/setup-php@2.37.0" + with: + coverage: "pcov" + php-version: "${{ matrix.php-version }}" + ini-values: memory_limit=-1 + extensions: pdo_pgsql + + - uses: ramsey/composer-install@4.0.0 + with: + dependency-versions: ${{ matrix.dependencies }} + composer-options: ${{ matrix.composer-options }} + + - name: "Tests" + run: "vendor/bin/phpunit --testsuite=integration --no-coverage" + + mariadb: + name: "mariadb" + + runs-on: ${{ matrix.operating-system }} + + services: + mariadb: + image: "mariadb:${{ matrix.mariadb-version }}" + env: + MYSQL_ALLOW_EMPTY_PASSWORD: yes + MYSQL_DATABASE: event_store + + options: >- + --health-cmd "mariadb-admin ping --silent" + ports: + - "3306:3306" + + strategy: + matrix: + dependencies: + - "locked" + php-version: + - "8.5" + operating-system: + - "ubuntu-latest" + mariadb-version: + - "10.6" + - "10.11" + - "11.4" + - "11.8" + - "12.1" + + env: + DB_URL: 'mysql://root@127.0.0.1:3306/event_store?charset=utf8' + DB_CONNECTION: 'mysql' + DB_DATABASE: 'event_store' + + steps: + - name: "Checkout" + uses: actions/checkout@v6 + + - name: "Install PHP" + uses: "shivammathur/setup-php@2.37.0" + with: + coverage: "pcov" + php-version: "${{ matrix.php-version }}" + ini-values: memory_limit=-1 + extensions: pdo_mysql + + - uses: ramsey/composer-install@4.0.0 + with: + dependency-versions: ${{ matrix.dependencies }} + composer-options: ${{ matrix.composer-options }} + + - name: "Tests" + run: "vendor/bin/phpunit --testsuite=integration --no-coverage" + + mysql: + name: "mysql" + + runs-on: ${{ matrix.operating-system }} + + services: + mysql: + image: "mysql:${{ matrix.mysql-version }}" + + env: + MYSQL_ALLOW_EMPTY_PASSWORD: yes + MYSQL_DATABASE: "event_store" + + options: >- + --health-cmd "mysqladmin ping --silent" + ports: + - "3306:3306" + + strategy: + matrix: + dependencies: + - "locked" + php-version: + - "8.5" + operating-system: + - "ubuntu-latest" + mysql-version: + - "8.0" + - "8.4" + - "9.5" + + env: + DB_URL: 'mysql://root@127.0.0.1:3306/event_store?charset=utf8' + DB_CONNECTION: 'mysql' + DB_DATABASE: 'event_store' + + steps: + - name: "Checkout" + uses: actions/checkout@v6 + + - name: "Install PHP" + uses: "shivammathur/setup-php@2.37.0" + with: + coverage: "pcov" + php-version: "${{ matrix.php-version }}" + ini-values: memory_limit=-1 + extensions: pdo_mysql + + - uses: ramsey/composer-install@4.0.0 + with: + dependency-versions: ${{ matrix.dependencies }} + composer-options: ${{ matrix.composer-options }} + + - name: "Tests" + run: "vendor/bin/phpunit --testsuite=integration --no-coverage" + + sqlite: + name: "Sqlite" + + runs-on: ${{ matrix.operating-system }} + + strategy: + matrix: + dependencies: + - "locked" + php-version: + - "8.5" + operating-system: + - "ubuntu-latest" + + steps: + - name: "Checkout" + uses: actions/checkout@v6 + + - name: "Install PHP" + uses: "shivammathur/setup-php@2.37.0" + with: + coverage: "pcov" + php-version: "${{ matrix.php-version }}" + ini-values: memory_limit=-1 + extensions: pdo_sqlite + + - uses: ramsey/composer-install@4.0.0 + with: + dependency-versions: ${{ matrix.dependencies }} + composer-options: ${{ matrix.composer-options }} + + - name: "Tests" + run: "vendor/bin/phpunit --testsuite=integration --no-coverage" diff --git a/.github/workflows/phpunit.yml b/.github/workflows/unit.yml similarity index 92% rename from .github/workflows/phpunit.yml rename to .github/workflows/unit.yml index dc078e6..538977e 100644 --- a/.github/workflows/phpunit.yml +++ b/.github/workflows/unit.yml @@ -51,4 +51,4 @@ jobs: dependency-versions: ${{ matrix.dependencies }} - name: "Tests" - run: "vendor/bin/phpunit --coverage-clover=clover.xml --coverage-text" + run: "vendor/bin/phpunit --testsuite=unit --coverage-clover=clover.xml --coverage-text" diff --git a/Makefile b/Makefile index 8106244..2d69f3b 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,8 @@ phpstan-baseline: vendor .PHONY: phpunit phpunit: vendor ## run phpunit tests - XDEBUG_MODE=coverage vendor/bin/phpunit + XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite=unit + XDEBUG_MODE=off vendor/bin/phpunit --testsuite=integration --no-coverage .PHONY: infection infection: vendor ## run infection diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..9eb1711 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,14 @@ +services: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_DB: event_store + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d eventstore"] + interval: 5s + timeout: 5s + retries: 5 diff --git a/composer.json b/composer.json index 9f18013..e37d006 100644 --- a/composer.json +++ b/composer.json @@ -20,7 +20,7 @@ "require": { "php": "~8.3.0 || ~8.4.0 || ~8.5.0", "illuminate/contracts": "^11.0 || ^12.0 || ^13.0", - "patchlevel/event-sourcing": "^3.15.0" + "patchlevel/event-sourcing": "^3.16.0" }, "require-dev": { "ext-pdo_sqlite": "*", diff --git a/composer.lock b/composer.lock index d28499c..078807f 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "5a9aa63bf924f6f7049a443ed3f64807", + "content-hash": "0dcd18037fb69f46a92629e34fd2c50b", "packages": [ { "name": "brick/math", diff --git a/database/migrations/create_eventsourcing_tables.php b/database/migrations/create_eventsourcing_tables.php index 29ad7c8..e777191 100644 --- a/database/migrations/create_eventsourcing_tables.php +++ b/database/migrations/create_eventsourcing_tables.php @@ -8,38 +8,40 @@ { public function up(): void { - Schema::create('eventstore', function (Blueprint $table) { - $table->bigIncrements('id'); - $table->string('aggregate', 255); - $table->char('aggregate_id', 36); - $table->integer('playhead'); - $table->string('event', 255); - $table->json('payload'); - $table->dateTime('recorded_on'); - $table->tinyInteger('new_stream_start')->default(0); - $table->tinyInteger('archived')->default(0); + Schema::create('event_store', function (Blueprint $table) { + $table->bigInteger('id', true); + $table->string('stream', 255); + $table->integer('playhead')->nullable(); + $table->string('event_id', 255); + $table->string('event_name', 255); + $table->json('event_payload'); + $table->dateTimeTz('recorded_on'); + $table->boolean('archived')->default(false); $table->json('custom_headers'); - $table->unique(['aggregate', 'aggregate_id', 'playhead']); - $table->index(['aggregate', 'aggregate_id', 'playhead', 'archived']); + + $table->unique('event_id'); + $table->unique(['stream', 'playhead']); + $table->unique(['stream', 'playhead', 'archived']); }); Schema::create('subscriptions', function (Blueprint $table) { $table->string('id', 255); $table->string('group_name', 32); $table->string('run_mode', 16); - $table->integer('position'); $table->string('status', 32); + $table->integer('position'); $table->longText('error_message')->nullable(); $table->string('error_previous_status', 32)->nullable(); $table->json('error_context')->nullable(); $table->integer('retry_attempt'); $table->dateTime('last_saved_at'); + $table->text('cleanup_tasks')->nullable(); $table->index('group_name'); $table->index('status'); $table->primary('id'); }); - Schema::create('eventstore_cipher_keys', function (Blueprint $table) { + Schema::create('crypto_keys', function (Blueprint $table) { $table->string('subject_id', 255); $table->string('crypto_key', 255); $table->string('crypto_method', 255); @@ -50,8 +52,8 @@ public function up(): void public function down(): void { - Schema::dropIfExists('eventstore'); + Schema::dropIfExists('event_store'); Schema::dropIfExists('subscriptions'); - Schema::dropIfExists('eventstore_cipher_keys'); + Schema::dropIfExists('crypto_keys'); } }; diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 110f209..22329a1 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -11,6 +11,9 @@ tests/Unit + + tests/Integration + @@ -22,6 +25,7 @@ + diff --git a/src/Cryptography/IlluminateCipherKeyStore.php b/src/Cryptography/IlluminateCipherKeyStore.php new file mode 100644 index 0000000..238ff20 --- /dev/null +++ b/src/Cryptography/IlluminateCipherKeyStore.php @@ -0,0 +1,96 @@ + */ + private array $keyCache = []; + + public function __construct( + private readonly Connection $connection, + private readonly string $tableName = 'crypto_keys', + ) { + } + + public function get(string $id): CipherKey + { + if (array_key_exists($id, $this->keyCache)) { + return $this->keyCache[$id]; + } + + /** @var Row|null $result */ + $result = $this->connection->selectOne( + "SELECT * FROM {$this->tableName} WHERE subject_id = :subject_id", + ['subject_id' => $id], + ); + + if ($result === null) { + throw new CipherKeyNotExists($id); + } + + $this->keyCache[$id] = new CipherKey( + base64_decode($result->crypto_key), + $result->crypto_method, + base64_decode($result->crypto_iv), + ); + + return $this->keyCache[$id]; + } + + public function store(string $id, CipherKey $key): void + { + $this->connection->statement( + <<tableName} + (subject_id, crypto_key, crypto_method, crypto_iv) + VALUES + (:subject_id, :crypto_key, :crypto_method, :crypto_iv) +SQL, + [ + 'subject_id' => $id, + 'crypto_key' => base64_encode($key->key), + 'crypto_method' => $key->method, + 'crypto_iv' => base64_encode($key->iv), + ], + ); + + $this->keyCache[$id] = $key; + } + + public function remove(string $id): void + { + $this->connection->statement( + <<tableName} WHERE subject_id = :subject_id +SQL, + ['subject_id' => $id], + ); + + unset($this->keyCache[$id]); + } + + public function clear(): void + { + $this->keyCache = []; + } +} diff --git a/src/Store/StreamIlluminateStore.php b/src/Store/StreamIlluminateStore.php new file mode 100644 index 0000000..3afbb1d --- /dev/null +++ b/src/Store/StreamIlluminateStore.php @@ -0,0 +1,480 @@ +headersSerializer = $headersSerializer ?? DefaultHeadersSerializer::createDefault(); + $this->clock = $clock ?? new SystemClock(); + + $this->config = [ + 'table_name' => 'event_store', + 'locking' => true, + 'lock_id' => self::DEFAULT_LOCK_ID, + 'lock_timeout' => -1, + 'keep_index' => false, + ...$config, + ]; + } + + public function load( + Criteria|null $criteria = null, + int|null $limit = null, + int|null $offset = null, + bool $backwards = false, + ): Stream { + $builder = $this->connection + ->table($this->config['table_name']) + ->select('*') + ->orderBy('id', $backwards ? 'desc' : 'asc'); + + $this->applyCriteria($builder, $criteria ?? new Criteria()); + + if ($limit !== null) { + $builder->limit($limit); + } + + if ($offset !== null) { + $builder->offset($offset); + } + + return new StreamIlluminateStoreStream( + $builder->cursor(), + $this->eventSerializer, + $this->headersSerializer, + ); + } + + public function count(Criteria|null $criteria = null): int + { + $builder = $this->connection + ->table($this->config['table_name']); + + $this->applyCriteria($builder, $criteria ?? new Criteria()); + + return $builder->count(); + } + + public function save(Message ...$messages): void + { + if ($messages === []) { + return; + } + + $this->transactional(function () use ($messages): void { + $columnsLength = $this->config['keep_index'] ? 9 : 8; + $batchSize = (int)floor(self::MAX_UNSIGNED_SMALL_INT / $columnsLength); + + $rows = []; + foreach ($messages as $message) { + $data = $this->eventSerializer->serialize($message->event()); + + try { + $streamName = $message->header(StreamNameHeader::class)->streamName; + } catch (HeaderNotFound $e) { + throw new MissingDataForStorage($e->name, $e); + } + + $eventId = $message->hasHeader(EventIdHeader::class) + ? $message->header(EventIdHeader::class)->eventId + : Uuid::uuid7()->toString(); + + $row = [ + 'stream' => $streamName, + 'playhead' => $message->hasHeader(PlayheadHeader::class) + ? $message->header(PlayheadHeader::class)->playhead + : null, + 'event_id' => $eventId, + 'event_name' => $data->name, + 'event_payload' => $data->payload, + 'recorded_on' => $message->hasHeader(RecordedOnHeader::class) + ? $message->header(RecordedOnHeader::class)->recordedOn + : $this->clock->now(), + 'archived' => $message->hasHeader(ArchivedHeader::class), + 'custom_headers' => $this->headersSerializer->serialize($this->getCustomHeaders($message)), + ]; + + if ($this->config['keep_index']) { + try { + $row['id'] = $message->header(IndexHeader::class)->index; + } catch (HeaderNotFound $e) { + throw new MissingDataForStorage($e->name, $e); + } + } + + $rows[] = $row; + + if (count($rows) !== $batchSize) { + continue; + } + + $this->executeSave($rows); + $rows = []; + } + + if ($rows !== []) { + $this->executeSave($rows); + } + + if (!$this->config['keep_index'] || $this->driverName() !== 'pgsql') { + return; + } + + $this->connection->statement(sprintf( + "SELECT setval('%s', (SELECT MAX(id) FROM %s));", + sprintf('%s_id_seq', $this->config['table_name']), + $this->config['table_name'], + )); + }); + } + + public function transactional(Closure $function): void + { + if ($this->hasLock || !$this->config['locking']) { + $this->connection->transaction($function); + + return; + } + + $this->connection->transaction(function () use ($function): void { + $this->lock(); + + try { + $function(); + } finally { + $this->unlock(); + } + }); + } + + /** @return list */ + public function streams(): array + { + /** @var list $streams */ + $streams = $this->connection + ->table($this->config['table_name']) + ->select('stream') + ->distinct() + ->orderBy('stream') + ->pluck('stream') + ->all(); + + return $streams; + } + + public function remove(Criteria|null $criteria = null): void + { + $builder = $this->connection->table($this->config['table_name']); + + $this->applyCriteria($builder, $criteria ?? new Criteria()); + + $builder->delete(); + } + + public function archive(Criteria|null $criteria = null): void + { + $builder = $this->connection->table($this->config['table_name']); + + $this->applyCriteria($builder, $criteria ?? new Criteria()); + + $builder->update(['archived' => true]); + } + + public function supportSubscription(): bool + { + return $this->driverName() === 'pgsql' + && class_exists(PDO::class) + && method_exists($this->connection, 'getPdo'); + } + + public function setupSubscription(): void + { + if (!$this->supportSubscription()) { + return; + } + + $functionName = $this->createTriggerFunctionName(); + + $this->connection->statement(sprintf( + <<<'SQL' + CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('%2$s', NEW.stream::text); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + SQL, + $functionName, + $this->config['table_name'], + )); + + $this->connection->statement(sprintf( + 'DROP TRIGGER IF EXISTS notify_trigger ON %s;', + $this->config['table_name'], + )); + + $this->connection->statement(sprintf( + 'CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', + $this->config['table_name'], + $functionName, + )); + } + + public function wait(int $timeoutMilliseconds): void + { + if (!$this->supportSubscription()) { + return; + } + + $this->connection->statement(sprintf('LISTEN "%s"', $this->config['table_name'])); + + if (!$this->connection instanceof Connection) { + return; + } + + $this->connection->getPdo()->pgsqlGetNotify(PDO::FETCH_ASSOC, $timeoutMilliseconds); + } + + /** @return list */ + private function getCustomHeaders(Message $message): array + { + $filteredHeaders = [ + IndexHeader::class, + StreamNameHeader::class, + EventIdHeader::class, + PlayheadHeader::class, + RecordedOnHeader::class, + ArchivedHeader::class, + ]; + + return array_values( + array_filter( + $message->headers(), + static fn (object $header): bool => !in_array($header::class, $filteredHeaders, true), + ), + ); + } + + private function applyCriteria(Builder $builder, Criteria $criteria): void + { + foreach ($criteria->all() as $criterion) { + switch ($criterion::class) { + case StreamCriterion::class: + if ($criterion->all()) { + break; + } + + if ($criterion->streamName === []) { + break; + } + + $builder->where(static function (Builder $query) use ($criterion): void { + foreach ($criterion->streamName as $index => $streamName) { + if (str_contains($streamName, '*')) { + $query->orWhere('stream', 'LIKE', str_replace('*', '%', $streamName)); + } else { + $query->orWhere('stream', '=', $streamName); + } + } + }); + + break; + case FromPlayheadCriterion::class: + $builder->where('playhead', '>', $criterion->fromPlayhead); + break; + case ToPlayheadCriterion::class: + $builder->where('playhead', '<', $criterion->toPlayhead); + break; + case ArchivedCriterion::class: + $builder->where('archived', '=', $criterion->archived); + break; + case FromIndexCriterion::class: + $builder->where('id', '>', $criterion->fromIndex); + break; + case ToIndexCriterion::class: + $builder->where('id', '<', $criterion->toIndex); + break; + case EventsCriterion::class: + $builder->whereIn('event_name', $criterion->events); + break; + case EventIdCriterion::class: + $builder->where('event_id', '=', $criterion->eventId); + break; + default: + throw new UnsupportedCriterion($criterion::class); + } + } + } + + /** + * @param list $rows + */ + private function executeSave(array $rows): void + { + try { + $this->connection->table($this->config['table_name'])->insert($rows); + } catch (IlluminateUniqueConstraintViolationException $e) { + throw new UniqueConstraintViolation($e); + } + } + + private function lock(): void + { + $this->hasLock = true; + + $driver = $this->driverName(); + + if ($driver === 'pgsql') { + $this->connection->selectOne('SELECT pg_advisory_xact_lock(?)', [$this->config['lock_id']]); + + return; + } + + if ($driver === 'mariadb' || $driver === 'mysql') { + $this->connection->select( + 'SELECT GET_LOCK(?, ?)', + [$this->config['lock_id'], $this->config['lock_timeout']], + ); + + return; + } + + if ($driver === 'sqlite') { + return; // sql locking is not needed because of file locking + } + + throw new LockingNotImplemented(Connection::class); + } + + private function unlock(): void + { + $this->hasLock = false; + + $driver = $this->driverName(); + + if ($driver === 'pgsql') { + return; // lock is released automatically after transaction + } + + if ($driver === 'mariadb' || $driver === 'mysql') { + $this->connection->select('SELECT RELEASE_LOCK(?)', [$this->config['lock_id']]); + + return; + } + + if ($driver === 'sqlite') { + return; // sql locking is not needed because of file locking + } + + throw new LockingNotImplemented(Connection::class); + } + + private function createTriggerFunctionName(): string + { + $tableConfig = explode('.', $this->config['table_name']); + + if (count($tableConfig) === 1) { + return sprintf('notify_%s', $tableConfig[0]); + } + + return sprintf('%s.notify_%s', $tableConfig[0], $tableConfig[1]); + } + + private function driverName(): string + { + if ($this->connection instanceof Connection) { + return $this->connection->getDriverName(); + } + + return 'unknown'; + } +} diff --git a/src/Store/StreamIlluminateStoreStream.php b/src/Store/StreamIlluminateStoreStream.php new file mode 100644 index 0000000..aefcc81 --- /dev/null +++ b/src/Store/StreamIlluminateStoreStream.php @@ -0,0 +1,206 @@ + */ +final class StreamIlluminateStoreStream implements Stream, IteratorAggregate +{ + /** @var iterable|stdClass>|null */ + private iterable|null $result; + + /** @var Generator */ + private Generator|null $generator; + + /** @var positive-int|0|null */ + private int|null $position; + + /** @var positive-int|null */ + private int|null $index; + + /** @param iterable|stdClass> $result */ + public function __construct( + iterable $result, + EventSerializer $eventSerializer, + HeadersSerializer $headersSerializer, + ) { + $this->result = $result; + $this->generator = $this->buildGenerator($result, $eventSerializer, $headersSerializer); + $this->position = null; + $this->index = null; + } + + public function close(): void + { + $this->result = null; + $this->generator = null; + } + + public function next(): void + { + $this->assertNotClosed(); + + $this->generator->next(); + } + + public function end(): bool + { + $this->assertNotClosed(); + + return !$this->generator->valid(); + } + + public function current(): Message|null + { + $this->assertNotClosed(); + + /** @var Message|null $current */ + $current = $this->generator->current(); + + return $current; + } + + /** @return positive-int|0|null */ + public function position(): int|null + { + $this->assertNotClosed(); + + if ($this->position === null) { + $this->generator->key(); + } + + return $this->position; + } + + /** @return positive-int|null */ + public function index(): int|null + { + $this->assertNotClosed(); + + if ($this->index === null) { + $this->generator->key(); + } + + return $this->index; + } + + /** @return Traversable */ + public function getIterator(): Traversable + { + $this->assertNotClosed(); + + return $this->generator; + } + + /** @return Generator */ + + /** + * @param iterable|stdClass> $result + * + * @return Generator + */ + private function buildGenerator( + iterable $result, + EventSerializer $eventSerializer, + HeadersSerializer $headersSerializer, + ): Generator { + foreach ($result as $data) { + if ($this->position === null) { + $this->position = 0; + } else { + ++$this->position; + } + + $id = (int)$this->extractValue($data, 'id'); + + if ($id <= 0) { + continue; + } + + $this->index = $id; + + $payload = $this->extractValue($data, 'event_payload'); + $event = $eventSerializer->deserialize(new SerializedEvent( + (string)$this->extractValue($data, 'event_name'), + is_string($payload) ? $payload : (string)$payload, + )); + + $recordedOn = $this->extractValue($data, 'recorded_on'); + $recordedOnDate = $recordedOn instanceof DateTimeInterface + ? DateTimeImmutable::createFromInterface($recordedOn) + : new DateTimeImmutable((string)$recordedOn); + + $message = Message::create($event) + ->withHeader(new IndexHeader($id)) + ->withHeader(new StreamNameHeader((string)$this->extractValue($data, 'stream'))) + ->withHeader(new RecordedOnHeader($recordedOnDate)) + ->withHeader(new EventIdHeader((string)$this->extractValue($data, 'event_id'))); + + $playhead = $this->extractValue($data, 'playhead'); + + if (is_int($playhead) && $playhead > 0) { + $message = $message->withHeader(new PlayheadHeader($playhead)); + } elseif (!is_int($playhead) && $playhead !== null && (int)$playhead > 0) { + $message = $message->withHeader(new PlayheadHeader((int)$playhead)); + } + + if ($this->extractValue($data, 'archived')) { + $message = $message->withHeader(new ArchivedHeader()); + } + + $customHeaders = $this->extractValue($data, 'custom_headers'); + + yield $message->withHeaders( + $headersSerializer->deserialize(is_string($customHeaders) ? $customHeaders : (string)$customHeaders), + ); + } + } + + private function extractValue(mixed $data, string $field): mixed + { + if (is_array($data)) { + return $data[$field] ?? null; + } + + if ($data instanceof stdClass) { + return $data->{$field} ?? null; + } + + return null; + } + + /** + * @phpstan-assert !null $this->result + * @phpstan-assert !null $this->generator + */ + private function assertNotClosed(): void + { + if ($this->result === null || $this->generator === null) { + throw new StreamClosed(); + } + } +} diff --git a/src/Subscription/Cleanup/IlluminateCleanupTaskHandler.php b/src/Subscription/Cleanup/IlluminateCleanupTaskHandler.php new file mode 100644 index 0000000..6430e6f --- /dev/null +++ b/src/Subscription/Cleanup/IlluminateCleanupTaskHandler.php @@ -0,0 +1,82 @@ +connection($task->connectionName)->getSchemaBuilder(); + if ($schemaManager->hasTable($task->table)) { + $schemaManager->drop($task->table); + } + + return; + } + + if ($task instanceof DropIndexTask) { + $schemaManager = $this->connection($task->connectionName)->getSchemaBuilder(); + + if (!$schemaManager->hasTable($task->table)) { + return; + } + + foreach ($schemaManager->getIndexListing($task->table) as $indexName) { + if (strtolower($indexName) === strtolower($task->index)) { + $schemaManager->table($task->table, static function (Blueprint $table) use ($task): void { + $table->dropIndex($task->index); + }); + break; + } + } + + return; + } + + throw new CleanupTaskNotSupported($task, self::class); + } + + public function supports(object $task): bool + { + return $task instanceof DropTableTask || $task instanceof DropIndexTask; + } + + private function connection(string|null $connectionName): Connection + { + if ($this->connection instanceof ConnectionResolverInterface) { + $connection = $this->connection->connection($connectionName); + + if (!$connection instanceof Connection) { + throw new UnexpectedConnectionType($connectionName, Connection::class, $connection::class); + } + + return $connection; + } + + if ($connectionName === null) { + return $this->connection; + } + + throw new ConnectionNameNotSupported($connectionName); + } +} diff --git a/src/Subscription/Store/IlluminateSubscriptionStore.php b/src/Subscription/Store/IlluminateSubscriptionStore.php new file mode 100644 index 0000000..14c40ad --- /dev/null +++ b/src/Subscription/Store/IlluminateSubscriptionStore.php @@ -0,0 +1,263 @@ +|null, + * retry_attempt: int|string, + * last_saved_at: mixed, + * cleanup_tasks?: string|null, + * } + */ +final class IlluminateSubscriptionStore implements LockableSubscriptionStore +{ + public function __construct( + private readonly Connection $connection, + private readonly ClockInterface $clock = new SystemClock(), + private readonly string $tableName = 'subscriptions', + ) { + } + + public function get(string $subscriptionId): Subscription + { + /** @var Row|null $result */ + $result = $this->connection->table($this->tableName) + ->select('*') + ->where('id', '=', $subscriptionId) + ->first(); + + if ($result === null) { + throw new SubscriptionNotFound($subscriptionId); + } + + return $this->createSubscription($result); + } + + /** @return list */ + public function find(SubscriptionCriteria|null $criteria = null): array + { + $qb = $this->connection->table($this->tableName) + ->select('*') + ->orderBy('id'); + + if ($this->connection->getDriverName() !== 'sqlite') { + dd('not called'); + $qb->lockForUpdate(); + } + + if ($criteria !== null) { + if ($criteria->ids !== null) { + $qb->whereIn('id', $criteria->ids); + } + + if ($criteria->groups !== null) { + $qb->whereIn('group_name', $criteria->groups); + } + + if ($criteria->status !== null) { + $qb->whereIn( + 'status', + array_map(static fn (Status $status) => $status->value, $criteria->status), + ); + } + } + + /** @var list $result */ + $result = $qb->get()->all(); + + return array_map( + fn (object $data) => $this->createSubscription($data), + $result, + ); + } + + public function add(Subscription $subscription): void + { + $subscriptionError = $subscription->subscriptionError(); + + $subscription->updateLastSavedAt($this->clock->now()); + + $this->connection->statement( + <<tableName} + (id, group_name, run_mode, status, position, error_message, error_previous_status, error_context, retry_attempt, last_saved_at, cleanup_tasks) + VALUES + (:id, :group_name, :run_mode, :status, :position, :error_message, :error_previous_status, :error_context, :retry_attempt, :last_saved_at, :cleanup_tasks) +SQL, + [ + 'id' => $subscription->id(), + 'group_name' => $subscription->group(), + 'run_mode' => $subscription->runMode()->value, + 'status' => $subscription->status()->value, + 'position' => $subscription->position(), + 'error_message' => $subscriptionError?->errorMessage, + 'error_previous_status' => $subscriptionError?->previousStatus?->value, + 'error_context' => $subscriptionError?->errorContext !== null ? json_encode($subscriptionError->errorContext, JSON_THROW_ON_ERROR) : null, + 'retry_attempt' => $subscription->retryAttempt(), + 'last_saved_at' => $subscription->lastSavedAt(), + 'cleanup_tasks' => $subscription->cleanupTasks() !== null ? serialize($subscription->cleanupTasks()) : null, + ], + ); + } + + public function update(Subscription $subscription): void + { + $subscriptionError = $subscription->subscriptionError(); + + $subscription->updateLastSavedAt($this->clock->now()); + + $effectedRows = $this->connection->table($this->tableName) + ->where('id', '=', $subscription->id()) + ->update( + [ + 'group_name' => $subscription->group(), + 'run_mode' => $subscription->runMode()->value, + 'status' => $subscription->status()->value, + 'position' => $subscription->position(), + 'error_message' => $subscriptionError?->errorMessage, + 'error_previous_status' => $subscriptionError?->previousStatus?->value, + 'error_context' => $subscriptionError?->errorContext !== null ? json_encode($subscriptionError->errorContext, JSON_THROW_ON_ERROR) : null, + 'retry_attempt' => $subscription->retryAttempt(), + 'last_saved_at' => $subscription->lastSavedAt(), + 'cleanup_tasks' => $subscription->cleanupTasks() !== null ? serialize($subscription->cleanupTasks()) : null, + ], + ); + + if ($effectedRows === 0) { + throw new SubscriptionNotFound($subscription->id()); + } + } + + public function remove(Subscription $subscription): void + { + $this->connection->statement( + <<tableName} WHERE id = :id +SQL, + ['id' => $subscription->id()], + ); + } + + /** + * @param Closure():T $closure + * + * @return T + * + * @throws TransactionCommitNotPossible + * + * @template T + */ + public function inLock(Closure $closure): mixed + { + $this->connection->beginTransaction(); + + try { + $result = $closure(); + } catch (Throwable $e) { + if ($this->connection->transactionLevel() > 0) { + $this->connection->rollBack(); + } + + throw $e; + } + + try { + $this->connection->commit(); + } catch (Throwable $e) { + throw new TransactionCommitNotPossible($e); + } + + return $result; + } + + /** @param Row $row */ + private function createSubscription(object $row): Subscription + { + $context = $this->decodeErrorContext($row->error_context); + + return new Subscription( + $row->id, + $row->group_name, + RunMode::from($row->run_mode), + Status::from($row->status), + (int)$row->position, + $row->error_message !== null ? new SubscriptionError( + $row->error_message, + $row->error_previous_status !== null ? Status::from($row->error_previous_status) : Status::New, + $context, + ) : null, + (int)$row->retry_attempt, + self::normalizeDateTime($row->last_saved_at), + isset($row->cleanup_tasks) && $row->cleanup_tasks !== null ? unserialize($row->cleanup_tasks) : null, + ); + } + + /** @return array|null */ + private function decodeErrorContext(mixed $value): array|null + { + if ($value === null) { + return null; + } + + if (is_array($value)) { + return $value; + } + + if (is_string($value)) { + /** @var array|null $decoded */ + $decoded = json_decode($value, true, 512, JSON_THROW_ON_ERROR); + + return $decoded; + } + + return null; + } + + private static function normalizeDateTime(mixed $value): DateTimeImmutable + { + if ($value instanceof DateTimeImmutable) { + return $value; + } + + if ($value instanceof DateTime) { + return DateTimeImmutable::createFromMutable($value); + } + + return new DateTimeImmutable((string)$value); + } +} diff --git a/tests/DatabaseManager.php b/tests/DatabaseManager.php new file mode 100644 index 0000000..a05ab80 --- /dev/null +++ b/tests/DatabaseManager.php @@ -0,0 +1,100 @@ +parseConfiguration($dbUrl); + + $driver = $config['driver'] ?? null; + + if (!is_string($driver)) { + throw new RuntimeException('missing driver in DB_URL'); + } + + if ($driver === 'sqlite') { + if (!$forceNewConnection) { + return DB::connection(); + } + + return self::makeConnection($config); + } + + self::recreateDatabase($config, $dbName); + $config['database'] = $dbName; + + return self::makeConnection($config); + } + + /** @param array $config */ + private static function recreateDatabase(array $config, string $dbName): void + { + $driver = $config['driver'] ?? null; + + if (!is_string($driver)) { + throw new RuntimeException('missing driver in DB_URL'); + } + + $adminConnection = self::makeConnection($config); + + try { + if ($driver === 'pgsql') { + $adminConnection->statement( + 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = ? AND pid <> pg_backend_pid()', + [$dbName], + ); + } + + $schemaBuilder = $adminConnection->getSchemaBuilder(); + $schemaBuilder->dropDatabaseIfExists($dbName); + $schemaBuilder->createDatabase($dbName); + } finally { + $adminConnection->disconnect(); + } + } + + /** @param array $config */ + private static function makeConnection(array $config): Connection + { + if (!array_key_exists('prefix', $config)) { + $config['prefix'] = ''; + } + + $name = 'event-sourcing-' . uniqid(); + $connection = (new ConnectionFactory(new Container()))->make($config, $name); + + if (!$connection instanceof Connection) { + throw new RuntimeException('No default connection found'); + } + + config()->set("database.connections.$name", $config); + DB::purge($name); + + return DB::connection($name); + } +} diff --git a/tests/Integration/BankAccountSplitStream/AccountId.php b/tests/Integration/BankAccountSplitStream/AccountId.php new file mode 100644 index 0000000..bc03049 --- /dev/null +++ b/tests/Integration/BankAccountSplitStream/AccountId.php @@ -0,0 +1,13 @@ + */ + public array $appliedEvents = []; + + public static function create(AccountId $id, string $name): self + { + $self = new self(); + $self->recordThat(new BankAccountCreated($id, $name)); + + return $self; + } + + /** @param positive-int $newAddedBalance */ + public function addBalance(int $newAddedBalance): void + { + $this->recordThat(new BalanceAdded($this->id, $newAddedBalance)); + } + + public function beginNewMonth(): void + { + $this->recordThat(new MonthPassed($this->id, $this->name, $this->balanceInCents)); + } + + #[Apply(BankAccountCreated::class)] + protected function applyBankAccountCreated(BankAccountCreated $event): void + { + $this->id = $event->accountId; + $this->name = $event->name; + $this->balanceInCents = 0; + $this->appliedEvents[] = $event; + } + + #[Apply(BalanceAdded::class)] + protected function applyBalanceAdded(BalanceAdded $event): void + { + $this->balanceInCents += $event->balanceInCents; + $this->appliedEvents[] = $event; + } + + #[Apply(MonthPassed::class)] + protected function applyMonthPassed(MonthPassed $event): void + { + $this->id = $event->accountId; + $this->name = $event->name; + $this->balanceInCents = $event->balanceInCents; + $this->appliedEvents[] = $event; + } + + public function name(): string + { + return $this->name; + } + + public function balance(): int + { + return $this->balanceInCents; + } +} diff --git a/tests/Integration/BankAccountSplitStream/BankAccountIntegrationTest.php b/tests/Integration/BankAccountSplitStream/BankAccountIntegrationTest.php new file mode 100644 index 0000000..2ca2b36 --- /dev/null +++ b/tests/Integration/BankAccountSplitStream/BankAccountIntegrationTest.php @@ -0,0 +1,273 @@ +connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $bankAccountProjector = new BankAccountProjector($this->connection); + + $engine = new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([$bankAccountProjector]), + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['bank_account' => BankAccount::class]), + $store, + null, + null, + new ChainMessageDecorator([ + new SplitStreamDecorator(new AttributeEventMetadataFactory()), + ]), + ); + $repository = $manager->get(BankAccount::class); + + + $engine->setup(); + $engine->boot(); + + $bankAccountId = AccountId::generate(); + $bankAccount = BankAccount::create($bankAccountId, 'John'); + $bankAccount->addBalance(100); + $bankAccount->addBalance(500); + $repository->save($bankAccount); + + $engine->run(); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_bank_account WHERE id = ?', + [$bankAccountId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($bankAccountId->toString(), $result->id); + self::assertSame('John', $result->name); + self::assertSame(600, $result->balance_in_cents); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['bank_account' => BankAccount::class]), + $store, + null, + null, + new ChainMessageDecorator([ + new SplitStreamDecorator(new AttributeEventMetadataFactory()), + ]), + ); + + $repository = $manager->get(BankAccount::class); + $bankAccount = $repository->load($bankAccountId); + + self::assertInstanceOf(BankAccount::class, $bankAccount); + self::assertEquals($bankAccountId, $bankAccount->aggregateRootId()); + self::assertSame(3, $bankAccount->playhead()); + self::assertSame('John', $bankAccount->name()); + self::assertSame(600, $bankAccount->balance()); + self::assertSame(3, count($bankAccount->appliedEvents)); + self::assertInstanceOf(BankAccountCreated::class, $bankAccount->appliedEvents[0]); + self::assertInstanceOf(BalanceAdded::class, $bankAccount->appliedEvents[1]); + self::assertInstanceOf(BalanceAdded::class, $bankAccount->appliedEvents[2]); + + $bankAccount->beginNewMonth(); + $bankAccount->addBalance(200); + $repository->save($bankAccount); + + $engine->run(); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_bank_account WHERE id = ?', + [$bankAccountId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($bankAccountId->toString(), $result->id); + self::assertSame('John', $result->name); + self::assertSame(800, $result->balance_in_cents); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['bank_account' => BankAccount::class]), + $store, + null, + null, + new ChainMessageDecorator([ + new SplitStreamDecorator(new AttributeEventMetadataFactory()), + ]), + ); + $repository = $manager->get(BankAccount::class); + $bankAccount = $repository->load($bankAccountId); + + self::assertInstanceOf(BankAccount::class, $bankAccount); + self::assertEquals($bankAccountId, $bankAccount->aggregateRootId()); + self::assertSame(5, $bankAccount->playhead()); + self::assertSame('John', $bankAccount->name()); + self::assertSame(800, $bankAccount->balance()); + self::assertSame(2, count($bankAccount->appliedEvents)); + self::assertInstanceOf(MonthPassed::class, $bankAccount->appliedEvents[0]); + self::assertInstanceOf(BalanceAdded::class, $bankAccount->appliedEvents[1]); + + /** @var list $messages */ + $messages = iterator_to_array($store->load()); + + self::assertCount(5, $messages); + + self::assertTrue($messages[0]->hasHeader(ArchivedHeader::class)); + self::assertTrue($messages[1]->hasHeader(ArchivedHeader::class)); + self::assertTrue($messages[2]->hasHeader(ArchivedHeader::class)); + + self::assertTrue($messages[3]->hasHeader(StreamStartHeader::class)); + + self::assertFalse($messages[3]->hasHeader(ArchivedHeader::class)); + self::assertFalse($messages[4]->hasHeader(ArchivedHeader::class)); + } + + public function testRemoveArchived(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $bankAccountProjector = new BankAccountProjector($this->connection); + + $engine = new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([$bankAccountProjector]), + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['bank_account' => BankAccount::class]), + $store, + null, + null, + new ChainMessageDecorator([ + new SplitStreamDecorator(new AttributeEventMetadataFactory()), + ]), + ); + $repository = $manager->get(BankAccount::class); + + + $engine->setup(); + $engine->boot(); + + $bankAccountId = AccountId::generate(); + $bankAccount = BankAccount::create($bankAccountId, 'John'); + $bankAccount->addBalance(100); + $bankAccount->addBalance(500); + $repository->save($bankAccount); + + $engine->run(); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_bank_account WHERE id = ?', + [$bankAccountId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($bankAccountId->toString(), $result->id); + self::assertSame('John', $result->name); + self::assertSame(600, $result->balance_in_cents); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['bank_account' => BankAccount::class]), + $store, + null, + null, + new ChainMessageDecorator([ + new SplitStreamDecorator(new AttributeEventMetadataFactory()), + ]), + ); + $repository = $manager->get(BankAccount::class); + $bankAccount = $repository->load($bankAccountId); + + self::assertInstanceOf(BankAccount::class, $bankAccount); + self::assertEquals($bankAccountId, $bankAccount->aggregateRootId()); + self::assertSame(3, $bankAccount->playhead()); + self::assertSame('John', $bankAccount->name()); + self::assertSame(600, $bankAccount->balance()); + self::assertSame(3, count($bankAccount->appliedEvents)); + self::assertInstanceOf(BankAccountCreated::class, $bankAccount->appliedEvents[0]); + self::assertInstanceOf(BalanceAdded::class, $bankAccount->appliedEvents[1]); + self::assertInstanceOf(BalanceAdded::class, $bankAccount->appliedEvents[2]); + + $bankAccount->beginNewMonth(); + $bankAccount->addBalance(200); + $repository->save($bankAccount); + + $engine->run(); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_bank_account WHERE id = ?', + [$bankAccountId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($bankAccountId->toString(), $result->id); + self::assertSame('John', $result->name); + self::assertSame(800, $result->balance_in_cents); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['bank_account' => BankAccount::class]), + $store, + null, + null, + new ChainMessageDecorator([ + new SplitStreamDecorator(new AttributeEventMetadataFactory()), + ]), + ); + $repository = $manager->get(BankAccount::class); + $bankAccount = $repository->load($bankAccountId); + + self::assertInstanceOf(BankAccount::class, $bankAccount); + self::assertEquals($bankAccountId, $bankAccount->aggregateRootId()); + self::assertSame(5, $bankAccount->playhead()); + self::assertSame('John', $bankAccount->name()); + self::assertSame(800, $bankAccount->balance()); + self::assertSame(2, count($bankAccount->appliedEvents)); + self::assertInstanceOf(MonthPassed::class, $bankAccount->appliedEvents[0]); + self::assertInstanceOf(BalanceAdded::class, $bankAccount->appliedEvents[1]); + } +} diff --git a/tests/Integration/BankAccountSplitStream/Events/BalanceAdded.php b/tests/Integration/BankAccountSplitStream/Events/BalanceAdded.php new file mode 100644 index 0000000..a47e113 --- /dev/null +++ b/tests/Integration/BankAccountSplitStream/Events/BalanceAdded.php @@ -0,0 +1,19 @@ +connection->getSchemaBuilder()->create('projection_bank_account', function (Blueprint $table): void { + $table->string('id', 36); + $table->string('name', 255); + $table->integer('balance_in_cents'); + $table->primary('id'); + }); + } + + #[Teardown] + public function drop(): void + { + $this->connection->getSchemaBuilder()->drop('projection_bank_account'); + } + + #[Subscribe(BankAccountCreated::class)] + public function handleBankAccountCreated(Message $message): void + { + $event = $message->event(); + + $this->connection->statement( + 'INSERT INTO projection_bank_account (id, name, balance_in_cents) VALUES(:id, :name, 0);', + [ + 'id' => $event->accountId->toString(), + 'name' => $event->name, + ], + ); + } + + #[Subscribe(BalanceAdded::class)] + public function handleBalanceAdded(Message $message): void + { + $event = $message->event(); + + $this->connection->statement( + 'UPDATE projection_bank_account SET balance_in_cents = balance_in_cents + :balance WHERE id = :id;', + [ + 'id' => $event->accountId->toString(), + 'balance' => $event->balanceInCents, + ], + ); + } +} diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php new file mode 100644 index 0000000..febe3a2 --- /dev/null +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -0,0 +1,389 @@ +connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + DefaultHeadersSerializer::createFromPaths([ + __DIR__ . '/Header', + ]), + ); + + $profileProjector = new ProfileProjector($this->connection); + + $engine = new ThrowOnErrorSubscriptionEngine(new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([ + $profileProjector, + new SendEmailProcessor(), + ]), + )); + + $manager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + null, + null, + new FooMessageDecorator(), + ), + $engine, + ); + + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + + $repository = $manager->get(Profile::class); + $repository->save($profile); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_profile WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($profileId->toString(), $result->id); + self::assertSame('John', $result->name); + + $repository = $manager->get(Profile::class); + $profile = $repository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(1, $profile->playhead()); + self::assertSame('John', $profile->name()); + self::assertSame(1, SendEmailMock::count()); + } + + public function testSnapshot(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + DefaultHeadersSerializer::createFromPaths([ + __DIR__ . '/Header', + ]), + ); + + $profileProjection = new ProfileProjector($this->connection); + + $engine = new ThrowOnErrorSubscriptionEngine(new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([ + $profileProjection, + new SendEmailProcessor(), + ]), + )); + + $manager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + null, + new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), + new FooMessageDecorator(), + ), + $engine, + ); + + $repository = $manager->get(Profile::class); + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $repository->save($profile); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_profile WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($profileId->toString(), $result->id); + self::assertSame('John', $result->name); + + $repository = $manager->get(Profile::class); + $profile = $repository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(1, $profile->playhead()); + self::assertSame('John', $profile->name()); + self::assertSame(1, SendEmailMock::count()); + } + + public function testTempProjection(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + DefaultHeadersSerializer::createFromPaths([ + __DIR__ . '/Header', + ]), + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + null, + new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), + new FooMessageDecorator(), + ); + + $repository = $manager->get(Profile::class); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + + for ($i = 0; $i < 100; $i++) { + $profile->changeName('John' . $i); + } + + $repository->save($profile); + + $state = (new Reducer()) + ->initState(['name' => 'unknown']) + ->match([ + ProfileCreated::class => static function (Message $message): array { + return ['name' => $message->event()->name]; + }, + NameChanged::class => static function (Message $message): array { + return ['name' => $message->event()->name]; + }, + ]) + ->reduce( + new Pipe( + $store->load(new Criteria( + new StreamCriterion('profile-' . $profileId->toString()), + )), + new UntilEventTranslator(new DateTimeImmutable()), + ), + ); + + self::assertSame(['name' => 'John99'], $state); + } + + public function testCommandBus(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + DefaultHeadersSerializer::createFromPaths([ + __DIR__ . '/Header', + ]), + ); + + $aggregateRootRegistry = new AggregateRootRegistry(['profile_with_commands' => ProfileWithCommands::class]); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile_with_commands' => ProfileWithCommands::class]), + $store, + null, + new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), + new FooMessageDecorator(), + ); + + $profileProjection = new ProfileProjector($this->connection); + + $engine = new ThrowOnErrorSubscriptionEngine(new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([ + $profileProjection, + new SendEmailProcessor(), + ]), + )); + + $manager = new RunSubscriptionEngineRepositoryManager($manager, $engine); + + $commandBus = SyncCommandBus::createForAggregateHandlers( + $aggregateRootRegistry, + $manager, + new ServiceLocator([ + ClockInterface::class => new SystemClock(), + 'env' => 'test', + ]), + ); + + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + + $commandBus->dispatch(new CreateProfile($profileId, 'John')); + $commandBus->dispatch(new ChangeProfileName($profileId, 'John Doe')); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_profile WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($profileId->toString(), $result->id); + self::assertSame('John Doe', $result->name); + + $repository = $manager->get(ProfileWithCommands::class); + $profile = $repository->load($profileId); + + self::assertInstanceOf(ProfileWithCommands::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(2, $profile->playhead()); + self::assertSame('John Doe', $profile->name()); + self::assertSame(1, SendEmailMock::count()); + } + + public function testQueryBus(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + DefaultHeadersSerializer::createFromPaths([ + __DIR__ . '/Header', + ]), + ); + + $aggregateRootRegistry = new AggregateRootRegistry(['profile_with_commands' => ProfileWithCommands::class]); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile_with_commands' => ProfileWithCommands::class]), + $store, + null, + new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), + new FooMessageDecorator(), + ); + + $profileProjection = new ProfileProjector($this->connection); + + $engine = new ThrowOnErrorSubscriptionEngine(new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([ + $profileProjection, + new SendEmailProcessor(), + ]), + )); + + $manager = new RunSubscriptionEngineRepositoryManager( + $manager, + $engine, + ); + + $commandBus = SyncCommandBus::createForAggregateHandlers( + $aggregateRootRegistry, + $manager, + new ServiceLocator([ + ClockInterface::class => new SystemClock(), + 'env' => 'test', + ]), + ); + + $queryBus = new SyncQueryBus(new ServiceHandlerProvider([$profileProjection])); + + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + + $commandBus->dispatch(new CreateProfile($profileId, 'John')); + $commandBus->dispatch(new ChangeProfileName($profileId, 'John Doe')); + + $result = $queryBus->dispatch(new QueryProfileName($profileId)); + + self::assertSame('John Doe', $result); + } + + public function testAggregateInitialization(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + DefaultHeadersSerializer::createFromPaths([ + __DIR__ . '/Header', + ]), + ); + + $aggregateRootRegistry = new AggregateRootRegistry(['stock' => Stock::class]); + + $manager = new DefaultRepositoryManager( + $aggregateRootRegistry, + $store, + null, + new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), + new FooMessageDecorator(), + ); + + $commandBus = SyncCommandBus::createForAggregateHandlers( + $aggregateRootRegistry, + $manager, + ); + + $stockId = StockId::create(); + $productId = ProductId::generate(); + + $commandBus->dispatch(new AdjustStockForProduct($stockId, $productId, 5)); + $commandBus->dispatch(new DecreaseStockForProduct($stockId, $productId, 3)); + + $repository = $manager->get(Stock::class); + $stock = $repository->load($stockId); + + self::assertEquals($stockId, $stock->aggregateRootId()); + self::assertSame(3, $stock->playhead()); + self::assertSame(2, $stock->stockFor($productId)); + } +} diff --git a/tests/Integration/BasicImplementation/Command/AdjustStockForProduct.php b/tests/Integration/BasicImplementation/Command/AdjustStockForProduct.php new file mode 100644 index 0000000..f864c17 --- /dev/null +++ b/tests/Integration/BasicImplementation/Command/AdjustStockForProduct.php @@ -0,0 +1,20 @@ +withHeader(new FooHeader('bar'))->withHeader(new BazHeader('test')); + } +} diff --git a/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php b/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php new file mode 100644 index 0000000..cbe9acc --- /dev/null +++ b/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php @@ -0,0 +1,21 @@ +recordThat(new ProfileCreated($id, $name)); + + return $self; + } + + public function changeName(string $name): void + { + $this->recordThat(new NameChanged($this->id, $name)); + } + + #[Apply(ProfileCreated::class)] + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId; + $this->name = $event->name; + } + + #[Apply(NameChanged::class)] + protected function applyNameChanged(NameChanged $event): void + { + $this->name = $event->name; + } + + public function name(): string + { + return $this->name; + } +} diff --git a/tests/Integration/BasicImplementation/ProfileId.php b/tests/Integration/BasicImplementation/ProfileId.php new file mode 100644 index 0000000..a9ff472 --- /dev/null +++ b/tests/Integration/BasicImplementation/ProfileId.php @@ -0,0 +1,13 @@ +recordThat(new ProfileCreated($command->id, $command->name)); + + return $self; + } + + #[Handle] + public function changeName( + ChangeProfileName $command, + ClockInterface $clock, + #[Inject('env')] + string $env, + ): void { + $this->recordThat(new NameChanged($this->id, $command->name)); + } + + #[Apply] + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId; + $this->name = $event->name; + } + + #[Apply] + protected function applyNameChanged(NameChanged $event): void + { + $this->name = $event->name; + } + + public function name(): string + { + return $this->name; + } +} diff --git a/tests/Integration/BasicImplementation/Projection/ProfileProjector.php b/tests/Integration/BasicImplementation/Projection/ProfileProjector.php new file mode 100644 index 0000000..c8a29ed --- /dev/null +++ b/tests/Integration/BasicImplementation/Projection/ProfileProjector.php @@ -0,0 +1,74 @@ +connection->getSchemaBuilder()->create('projection_profile', function (Blueprint $table): void { + $table->string('id', 36); + $table->string('name', 255); + $table->primary('id'); + }); + } + + #[Teardown] + public function drop(): void + { + $this->connection->getSchemaBuilder()->drop('projection_profile'); + } + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated(ProfileCreated $profileCreated): void + { + $this->connection->statement( + 'INSERT INTO projection_profile (id, name) VALUES(:id, :name);', + [ + 'id' => $profileCreated->profileId->toString(), + 'name' => $profileCreated->name, + ], + ); + } + + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $nameChanged): void + { + $this->connection->statement( + 'UPDATE projection_profile SET name = :name WHERE id = :id;', + [ + 'id' => $nameChanged->id->toString(), + 'name' => $nameChanged->name, + ], + ); + } + + #[Answer] + public function getProfileName(QueryProfileName $queryProfileName): string + { + return $this->connection->selectOne( + 'SELECT name FROM projection_profile WHERE id = :id', + ['id' => $queryProfileName->id->toString()], + )->name; + } +} diff --git a/tests/Integration/BasicImplementation/Query/QueryProfileName.php b/tests/Integration/BasicImplementation/Query/QueryProfileName.php new file mode 100644 index 0000000..b001082 --- /dev/null +++ b/tests/Integration/BasicImplementation/Query/QueryProfileName.php @@ -0,0 +1,14 @@ + */ + private array $stock; + + #[AutoInitialize] + public static function initialize(StockId $id): static + { + $stock = new self(); + $stock->recordThat(new StockCreated($id)); + + return $stock; + } + + public function id(): StockId + { + return $this->id; + } + + public function stockFor(ProductId $productId): int + { + return $this->stock[$productId->toString()] ?? 0; + } + + #[Handle] + public function decreaseStock(DecreaseStockForProduct $command): void + { + $this->recordThat(new StockDecreased($this->id, $command->productId, $command->quantity)); + } + + #[Handle] + public function adjustStock(AdjustStockForProduct $command): void + { + $this->recordThat(new StockAdjusted($this->id, $command->productId, $command->quantity)); + } + + #[Apply] + protected function applyStockCreated(StockCreated $event): void + { + $this->id = $event->stockId; + $this->stock = []; + } + + #[Apply] + protected function applyStockDecreased(StockDecreased $event): void + { + $this->stock[$event->productId->toString()] = $this->stockFor($event->productId) - $event->quantity; + } + + #[Apply] + protected function applyStockAdjusted(StockAdjusted $event): void + { + $this->stock[$event->productId->toString()] = $event->quantity; + } +} diff --git a/tests/Integration/BasicImplementation/StockId.php b/tests/Integration/BasicImplementation/StockId.php new file mode 100644 index 0000000..99b8eaf --- /dev/null +++ b/tests/Integration/BasicImplementation/StockId.php @@ -0,0 +1,18 @@ +connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $profileProjector = new ProfileProjector($this->connection); + + $engine = new ThrowOnErrorSubscriptionEngine(new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([$profileProjector]), + )); + + $manager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + null, + null, + ), + $engine, + ); + + $repository = $manager->get(Profile::class); + + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $profile->changeName('Snow'); + $profile->trackView(); + $repository->save($profile); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_profile WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($profileId->toString(), $result->id); + self::assertSame('Snow', $result->name); + + $repository = $manager->get(Profile::class); + $profile = $repository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(3, $profile->playhead()); + self::assertSame('Snow', $profile->name()); + self::assertSame(1, $profile->views()); + } + + public function testSnapshot(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $profileProjection = new ProfileProjector($this->connection); + + $engine = new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([$profileProjection]), + ); + + $manager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + null, + new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), + ), + $engine, + ); + + $repository = $manager->get(Profile::class); + + + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $repository->save($profile); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_profile WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($profileId->toString(), $result->id); + self::assertSame('John', $result->name); + + $repository = $manager->get(Profile::class); + + // create snapshot + $repository->load($profileId); + + // load from snapshot + $profile = $repository->load($profileId); + + $profile->changeName('Snow'); + $profile->trackView(); + $repository->save($profile); + + $profile = $repository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(3, $profile->playhead()); + self::assertSame('Snow', $profile->name()); + self::assertSame(1, $profile->views()); + } +} diff --git a/tests/Integration/ChildAggregate/Events/NameChanged.php b/tests/Integration/ChildAggregate/Events/NameChanged.php new file mode 100644 index 0000000..bd040d1 --- /dev/null +++ b/tests/Integration/ChildAggregate/Events/NameChanged.php @@ -0,0 +1,18 @@ +name = $event->name; + } + + public function name(): string + { + return $this->name; + } + + public function changeName(string $name): void + { + $this->recordThat(new NameChanged($this->id, $name)); + } +} diff --git a/tests/Integration/ChildAggregate/Profile.php b/tests/Integration/ChildAggregate/Profile.php new file mode 100644 index 0000000..bad6d78 --- /dev/null +++ b/tests/Integration/ChildAggregate/Profile.php @@ -0,0 +1,63 @@ +recordThat(new ProfileCreated($id, $name)); + + return $self; + } + + #[Apply(ProfileCreated::class)] + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId; + $this->personalInformation = new PersonalInformation($this->id, $event->name); + $this->views = new Views(); + } + + public function name(): string + { + return $this->personalInformation->name(); + } + + public function views(): int + { + return $this->views->views(); + } + + public function changeName(string $name): void + { + $this->personalInformation->changeName($name); + } + + public function trackView(): void + { + $this->views->trackView(); + } +} diff --git a/tests/Integration/ChildAggregate/ProfileId.php b/tests/Integration/ChildAggregate/ProfileId.php new file mode 100644 index 0000000..d94f942 --- /dev/null +++ b/tests/Integration/ChildAggregate/ProfileId.php @@ -0,0 +1,13 @@ +connection->getSchemaBuilder()->create('projection_profile', function (Blueprint $table): void { + $table->string('id', 36); + $table->string('name', 255); + $table->primary('id'); + }); + } + + #[Teardown] + public function drop(): void + { + $this->connection->getSchemaBuilder()->drop('projection_profile'); + } + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated(ProfileCreated $profileCreated): void + { + $this->connection->statement( + 'INSERT INTO projection_profile (id, name) VALUES(:id, :name);', + [ + 'id' => $profileCreated->profileId->toString(), + 'name' => $profileCreated->name, + ], + ); + } + + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $nameChanged): void + { + $this->connection->statement( + 'UPDATE projection_profile SET name = :name WHERE id = :id;', + [ + 'id' => $nameChanged->id->toString(), + 'name' => $nameChanged->name, + ], + ); + } +} diff --git a/tests/Integration/ChildAggregate/SendEmailMock.php b/tests/Integration/ChildAggregate/SendEmailMock.php new file mode 100644 index 0000000..36973b2 --- /dev/null +++ b/tests/Integration/ChildAggregate/SendEmailMock.php @@ -0,0 +1,25 @@ +views++; + } + + public function views(): int + { + return $this->views; + } + + public function trackView(): void + { + $this->recordThat(new ViewTracked()); + } +} diff --git a/tests/Integration/IntegrationTestCase.php b/tests/Integration/IntegrationTestCase.php new file mode 100644 index 0000000..9b0f32e --- /dev/null +++ b/tests/Integration/IntegrationTestCase.php @@ -0,0 +1,41 @@ +up(); + + $this->connection = DatabaseManager::createConnection(); + } + + public function tearDown(): void + { + $this->connection->disconnect(); + SendEmailMock::reset(); + parent::tearDown(); + } +} \ No newline at end of file diff --git a/tests/Integration/MicroAggregate/Events/NameChanged.php b/tests/Integration/MicroAggregate/Events/NameChanged.php new file mode 100644 index 0000000..ae67f41 --- /dev/null +++ b/tests/Integration/MicroAggregate/Events/NameChanged.php @@ -0,0 +1,18 @@ +connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $profileProjector = new ProfileProjector($this->connection); + + $engine = new ThrowOnErrorSubscriptionEngine(new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([$profileProjector]), + )); + + $manager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + new AggregateRootRegistry([ + 'profile' => Profile::class, + 'personal_information' => PersonalInformation::class, + ]), + $store, + null, + null, + ), + $engine, + ); + + $profileRepository = $manager->get(Profile::class); + $personalInformationRepository = $manager->get(PersonalInformation::class); + + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $profileRepository->save($profile); + + $personalInformation = $personalInformationRepository->load($profileId); + $personalInformation->changeName('Snow'); + $personalInformationRepository->save($personalInformation); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_profile WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($profileId->toString(), $result->id); + self::assertSame('Snow', $result->name); + + $profile = $profileRepository->load($profileId); + $personalInformation = $personalInformationRepository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(2, $profile->playhead()); + self::assertSame('Snow', $personalInformation->name()); + } + + public function testSnapshot(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $profileProjection = new ProfileProjector($this->connection); + + $engine = new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([$profileProjection]), + ); + + $manager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + new AggregateRootRegistry([ + 'profile' => Profile::class, + 'personal_information' => PersonalInformation::class, + ]), + $store, + null, + new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), + ), + $engine, + ); + + $profileRepository = $manager->get(Profile::class); + $personalInformationRepository = $manager->get(PersonalInformation::class); + + + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $profileRepository->save($profile); + + $result = $this->connection->selectOne( + 'SELECT * FROM projection_profile WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($profileId->toString(), $result->id); + self::assertSame('John', $result->name); + + // create snapshot + $profileRepository->load($profileId); + $personalInformationRepository->load($profileId); + + // load from snapshot + $personalInformation = $personalInformationRepository->load($profileId); + + $personalInformation->changeName('Snow'); + $personalInformationRepository->save($personalInformation); + + $profile = $profileRepository->load($profileId); + $personalInformation = $personalInformationRepository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(2, $profile->playhead()); + self::assertSame('Snow', $personalInformation->name()); + } +} diff --git a/tests/Integration/MicroAggregate/PersonalInformation.php b/tests/Integration/MicroAggregate/PersonalInformation.php new file mode 100644 index 0000000..95581ed --- /dev/null +++ b/tests/Integration/MicroAggregate/PersonalInformation.php @@ -0,0 +1,48 @@ +id = $event->profileId; + $this->name = $event->name; + } + + #[Apply(NameChanged::class)] + protected function applyNameChanged(NameChanged $event): void + { + $this->name = $event->name; + } + + public function name(): string + { + return $this->name; + } + + public function changeName(string $name): void + { + $this->recordThat(new NameChanged($this->id, $name)); + } +} diff --git a/tests/Integration/MicroAggregate/Profile.php b/tests/Integration/MicroAggregate/Profile.php new file mode 100644 index 0000000..a330923 --- /dev/null +++ b/tests/Integration/MicroAggregate/Profile.php @@ -0,0 +1,36 @@ +recordThat(new ProfileCreated($id, $name)); + + return $self; + } + + #[Apply(ProfileCreated::class)] + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId; + } +} diff --git a/tests/Integration/MicroAggregate/ProfileId.php b/tests/Integration/MicroAggregate/ProfileId.php new file mode 100644 index 0000000..8e69cd5 --- /dev/null +++ b/tests/Integration/MicroAggregate/ProfileId.php @@ -0,0 +1,13 @@ +connection->getSchemaBuilder()->create('projection_profile', function (Blueprint $table): void { + $table->string('id', 36); + $table->string('name', 255); + $table->primary('id'); + }); + } + + #[Teardown] + public function drop(): void + { + $this->connection->getSchemaBuilder()->drop('projection_profile'); + } + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated(ProfileCreated $profileCreated): void + { + $this->connection->statement( + 'INSERT INTO projection_profile (id, name) VALUES(:id, :name);', + [ + 'id' => $profileCreated->profileId->toString(), + 'name' => $profileCreated->name, + ], + ); + } + + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $nameChanged): void + { + $this->connection->statement( + 'UPDATE projection_profile SET name = :name WHERE id = :id;', + [ + 'id' => $nameChanged->profileId->toString(), + 'name' => $nameChanged->name, + ], + ); + } +} diff --git a/tests/Integration/MicroAggregate/SendEmailMock.php b/tests/Integration/MicroAggregate/SendEmailMock.php new file mode 100644 index 0000000..aace532 --- /dev/null +++ b/tests/Integration/MicroAggregate/SendEmailMock.php @@ -0,0 +1,25 @@ +connection); + $cryptographer = PersonalDataPayloadCryptographer::createWithOpenssl($cipherKeyStore); + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events'], cryptographer: $cryptographer), + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + + $repository->save($profile); + + $profile = $repository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(1, $profile->playhead()); + self::assertSame('John', $profile->name()); + + $result = $this->connection->select('SELECT * FROM event_store'); + + self::assertCount(1, $result); + self::assertArrayHasKey(0, $result); + + $row = $result[0]; + + self::assertStringNotContainsString('John', $row->event_payload); + } + + public function testRemoveKeyWithEvent(): void + { + $cipherKeyStore = new IlluminateCipherKeyStore($this->connection); + $cryptographer = PersonalDataPayloadCryptographer::createWithOpenssl($cipherKeyStore); + + $subscriptionStore = new IlluminateSubscriptionStore($this->connection); + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events'], cryptographer: $cryptographer), + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $engine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new DeletePersonalDataProcessor($cipherKeyStore)]), + ); + + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + + $repository->save($profile); + $engine->run(); + + $profile = $repository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(1, $profile->playhead()); + self::assertSame('John', $profile->name()); + + $profile->removePersonalData(); + $repository->save($profile); + $engine->run(); + + $profile = $repository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(2, $profile->playhead()); + self::assertSame('unknown', $profile->name()); + + $profile->changeName('hallo'); + $repository->save($profile); + + $profile = $repository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(3, $profile->playhead()); + self::assertSame('hallo', $profile->name()); + } + + public function testRemoveKeyWithEventAndSnapshot(): void + { + $cipherKeyStore = new IlluminateCipherKeyStore($this->connection); + $cryptographer = PersonalDataPayloadCryptographer::createWithOpenssl($cipherKeyStore); + + $subscriptionStore = new IlluminateSubscriptionStore($this->connection); + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events'], cryptographer: $cryptographer), + ); + + $snapshotAdapter = new InMemorySnapshotAdapter(); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + null, + DefaultSnapshotStore::createDefault( + ['default' => $snapshotAdapter], + $cryptographer, + ), + ); + + $repository = $manager->get(Profile::class); + + $engine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new DeletePersonalDataProcessor($cipherKeyStore)]), + ); + + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $profile->changeName('John 2'); + + $repository->save($profile); + $engine->run(); + + $profile = $repository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(2, $profile->playhead()); + self::assertSame('John 2', $profile->name()); + + $cipherKeyStore->remove($profileId->toString()); + + $profile = $repository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(2, $profile->playhead()); + self::assertSame('unknown', $profile->name()); + } +} diff --git a/tests/Integration/PersonalData/Processor/DeletePersonalDataProcessor.php b/tests/Integration/PersonalData/Processor/DeletePersonalDataProcessor.php new file mode 100644 index 0000000..933b7ba --- /dev/null +++ b/tests/Integration/PersonalData/Processor/DeletePersonalDataProcessor.php @@ -0,0 +1,25 @@ +cipherKeyStore->remove($event->profileId->toString()); + } +} diff --git a/tests/Integration/PersonalData/Profile.php b/tests/Integration/PersonalData/Profile.php new file mode 100644 index 0000000..87d0f2b --- /dev/null +++ b/tests/Integration/PersonalData/Profile.php @@ -0,0 +1,70 @@ +recordThat(new ProfileCreated($id, $name)); + + return $self; + } + + public function removePersonalData(): void + { + $this->recordThat(new PersonalDataRemoved($this->id)); + } + + public function changeName(string $name): void + { + $this->recordThat(new NameChanged($this->id, $name)); + } + + #[Apply(ProfileCreated::class)] + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId; + $this->name = $event->name; + } + + #[Apply(PersonalDataRemoved::class)] + protected function applyPersonalDataRemoved(): void + { + $this->name = 'unknown'; + } + + #[Apply(NameChanged::class)] + protected function applyNameChanged(NameChanged $event): void + { + $this->name = $event->name; + } + + public function name(): string + { + return $this->name; + } +} diff --git a/tests/Integration/PersonalData/ProfileId.php b/tests/Integration/PersonalData/ProfileId.php new file mode 100644 index 0000000..ac3eaa6 --- /dev/null +++ b/tests/Integration/PersonalData/ProfileId.php @@ -0,0 +1,13 @@ +recordThat(new ProfileCreated($id, $name)); + + return $self; + } + + #[Apply(ProfileCreated::class)] + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId; + $this->name = $event->name; + } + + public function name(): string + { + return $this->name; + } +} diff --git a/tests/Integration/Store/ProfileId.php b/tests/Integration/Store/ProfileId.php new file mode 100644 index 0000000..5df08b2 --- /dev/null +++ b/tests/Integration/Store/ProfileId.php @@ -0,0 +1,13 @@ +clock = new FrozenClock(new DateTimeImmutable('2020-01-01 00:00:00')); + $this->store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + clock: $this->clock, + ); + } + + public function testSave(): void + { + $profileId = ProfileId::generate(); + + $messages = [ + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-02 00:00:00'))), + ]; + + $this->store->save(...$messages); + + /** @var list $result */ + $result = $this->connection->select('SELECT * FROM event_store'); + + self::assertCount(2, $result); + + $result1 = $result[0]; + + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result1->stream); + self::assertEquals('1', $result1->playhead); + self::assertStringContainsString('2020-01-01 00:00:00', $result1->recorded_on); + self::assertEquals('profile.created', $result1->event_name); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result1->event_payload, true), + ); + + $result2 = $result[1]; + + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result2->stream); + self::assertEquals('2', $result2->playhead); + self::assertStringContainsString('2020-01-02 00:00:00', $result2->recorded_on); + self::assertEquals('profile.created', $result2->event_name); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result2->event_payload, true), + ); + } + + public function testSaveWithIndex(): void + { + $profileId = ProfileId::generate(); + + $messages = [ + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))) + ->withHeader(new IndexHeader(1)), + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-02 00:00:00'))) + ->withHeader(new IndexHeader(42)), + ]; + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + clock: $this->clock, + config: ['keep_index' => true], + ); + + $store->save(...$messages); + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + clock: $this->clock, + ); + + $store->save( + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(3)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-02 00:00:00'))), + ); + + /** @var list> $result */ + $result = $this->connection->select('SELECT * FROM event_store'); + + self::assertCount(3, $result); + + $result1 = $result[0]; + + self::assertEquals(1, $result1->id); + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result1->stream); + self::assertEquals('1', $result1->playhead); + self::assertStringContainsString('2020-01-01 00:00:00', $result1->recorded_on); + self::assertEquals('profile.created', $result1->event_name); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result1->event_payload, true), + ); + + $result2 = $result[1]; + + self::assertEquals(42, $result2->id); + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result2->stream); + self::assertEquals('2', $result2->playhead); + self::assertStringContainsString('2020-01-02 00:00:00', $result2->recorded_on); + self::assertEquals('profile.created', $result2->event_name); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result2->event_payload, true), + ); + + $result3 = $result[2]; + + self::assertEquals(43, $result3->id); + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result3->stream); + self::assertEquals('3', $result3->playhead); + self::assertStringContainsString('2020-01-02 00:00:00', $result3->recorded_on); + self::assertEquals('profile.created', $result3->event_name); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result3->event_payload, true), + ); + } + + public function testSaveWithOnlyStreamName(): void + { + $messages = [ + Message::create(new ExternEvent('test 1')) + ->withHeader(new StreamNameHeader('extern')), + Message::create(new ExternEvent('test 2')) + ->withHeader(new StreamNameHeader('extern')), + ]; + + $this->store->save(...$messages); + + /** @var list> $result */ + $result = $this->connection->select('SELECT * FROM event_store'); + + self::assertCount(2, $result); + + $result1 = $result[0]; + + self::assertEquals('extern', $result1->stream); + self::assertEquals(null, $result1->playhead); + self::assertStringContainsString('2020-01-01 00:00:00', $result1->recorded_on); + self::assertEquals('extern', $result1->event_name); + self::assertEquals( + ['message' => 'test 1'], + json_decode($result1->event_payload, true), + ); + + $result2 = $result[1]; + + self::assertEquals('extern', $result2->stream); + self::assertEquals(null, $result2->playhead); + self::assertStringContainsString('2020-01-01 00:00:00', $result2->recorded_on); + self::assertEquals('extern', $result2->event_name); + self::assertEquals( + ['message' => 'test 2'], + json_decode($result2->event_payload, true), + ); + } + + public function testSaveWithTransactional(): void + { + $profileId = ProfileId::generate(); + + $messages = [ + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-02 00:00:00'))), + ]; + + $this->store->transactional(function () use ($messages): void { + $this->store->save(...$messages); + }); + + /** @var list> $result */ + $result = $this->connection->select('SELECT * FROM event_store'); + + self::assertCount(2, $result); + + $result1 = $result[0]; + + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result1->stream); + self::assertEquals('1', $result1->playhead); + self::assertStringContainsString('2020-01-01 00:00:00', $result1->recorded_on); + self::assertEquals('profile.created', $result1->event_name); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result1->event_payload, true), + ); + + $result2 = $result[1]; + + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result2->stream); + self::assertEquals('2', $result2->playhead); + self::assertStringContainsString('2020-01-02 00:00:00', $result2->recorded_on); + self::assertEquals('profile.created', $result2->event_name); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result2->event_payload, true), + ); + } + + public function testArchive(): void + { + $profileId = ProfileId::generate(); + + $messages = [ + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new EventIdHeader('1')) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new EventIdHeader('2')) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-02 00:00:00'))), + ]; + + $this->store->save(...$messages); + $this->store->archive( + new Criteria( + new StreamCriterion(sprintf('profile-%s', $profileId->toString())), + new ToPlayheadCriterion(2), + ), + ); + + /** @var list> $result */ + $result = $this->connection->select('SELECT * FROM event_store ORDER BY id'); + + self::assertCount(2, $result); + + $result1 = $result[0]; + + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result1->stream); + self::assertEquals('1', $result1->playhead); + self::assertStringContainsString('2020-01-01 00:00:00', $result1->recorded_on); + self::assertEquals('profile.created', $result1->event_name); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result1->event_payload, true), + ); + + self::assertEquals('1', $result1->archived); + + $result2 = $result[1]; + + self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result2->stream); + self::assertEquals('2', $result2->playhead); + self::assertStringContainsString('2020-01-02 00:00:00', $result2->recorded_on); + self::assertEquals('profile.created', $result2->event_name); + self::assertEquals( + ['profileId' => $profileId->toString(), 'name' => 'test'], + json_decode($result2->event_payload, true), + ); + + self::assertEquals('0', $result2->archived); + } + + public function testUniqueStreamNameAndPlayheadConstraint(): void + { + $this->expectException(UniqueConstraintViolation::class); + + $profileId = ProfileId::generate(); + + $messages = [ + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + ]; + + $this->store->save(...$messages); + } + + public function testUniqueEventIdConstraint(): void + { + $this->expectException(UniqueConstraintViolation::class); + + $profileId = ProfileId::generate(); + + $messages = [ + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new EventIdHeader('1')) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new EventIdHeader('1')) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + ]; + + $this->store->save(...$messages); + } + + public function testSave10000Messages(): void + { + $profileId = ProfileId::generate(); + + $messages = []; + + for ($i = 1; $i <= 10000; $i++) { + $messages[] = Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader($i)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))); + } + + $this->store->save(...$messages); + + /** @var int $result */ + $result = $this->connection->scalar('SELECT COUNT(*) FROM event_store'); + + self::assertEquals(10000, $result); + } + + public function testLoad(): void + { + $profileId = ProfileId::generate(); + + $message = Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))); + + $this->store->save($message); + + $stream = null; + + try { + $stream = $this->store->load(); + + self::assertSame(1, $stream->index()); + self::assertSame(0, $stream->position()); + + $loadedMessage = $stream->current(); + + self::assertInstanceOf(Message::class, $loadedMessage); + self::assertNotSame($message, $loadedMessage); + self::assertEquals($message->event(), $loadedMessage->event()); + self::assertEquals( + $message->header(StreamNameHeader::class)->streamName, + $loadedMessage->header(StreamNameHeader::class)->streamName, + ); + self::assertEquals( + $message->header(PlayheadHeader::class)->playhead, + $loadedMessage->header(PlayheadHeader::class)->playhead, + ); + self::assertEquals( + $message->header(RecordedOnHeader::class)->recordedOn, + $loadedMessage->header(RecordedOnHeader::class)->recordedOn, + ); + } finally { + $stream?->close(); + } + } + + public function testLoadWithWildcard(): void + { + $profileId1 = ProfileId::generate(); + $profileId2 = ProfileId::generate(); + + $messages = [ + Message::create(new ProfileCreated($profileId1, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId1->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ProfileCreated($profileId2, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId2->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ExternEvent('test message')) + ->withHeader(new StreamNameHeader('foo')), + ]; + + $this->store->save(...$messages); + + $stream = null; + + try { + $stream = $this->store->load(new Criteria(new StreamCriterion('profile-*'))); + + $messages = iterator_to_array($stream); + + self::assertCount(2, $messages); + } finally { + $stream?->close(); + } + + try { + $stream = $this->store->load(new Criteria(new StreamCriterion('*-*'))); + + $messages = iterator_to_array($stream); + + self::assertCount(2, $messages); + } finally { + $stream?->close(); + } + } + + public function testStreams(): void + { + $profileId = ProfileId::fromString('0190e47e-77e9-7b90-bf62-08bbf0ab9b4b'); + + $messages = [ + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ExternEvent('test message')) + ->withHeader(new StreamNameHeader('foo')), + ]; + + $this->store->save(...$messages); + + $streams = $this->store->streams(); + + self::assertEquals([ + 'foo', + 'profile-0190e47e-77e9-7b90-bf62-08bbf0ab9b4b', + ], $streams); + } + + public function testRemove(): void + { + $profileId = ProfileId::fromString('0190e47e-77e9-7b90-bf62-08bbf0ab9b4b'); + + $messages = [ + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ProfileCreated($profileId, 'test')) + ->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString()))) + ->withHeader(new PlayheadHeader(2)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))), + Message::create(new ExternEvent('test message')) + ->withHeader(new StreamNameHeader('foo')), + ]; + + $this->store->save(...$messages); + + $streams = $this->store->streams(); + + self::assertEquals([ + 'foo', + 'profile-0190e47e-77e9-7b90-bf62-08bbf0ab9b4b', + ], $streams); + + $this->store->remove(new Criteria(new StreamCriterion('profile-*'))); + + $streams = $this->store->streams(); + + self::assertEquals(['foo'], $streams); + } +} diff --git a/tests/Integration/Subscription/Events/AdminPromoted.php b/tests/Integration/Subscription/Events/AdminPromoted.php new file mode 100644 index 0000000..3416069 --- /dev/null +++ b/tests/Integration/Subscription/Events/AdminPromoted.php @@ -0,0 +1,17 @@ +recordThat(new ProfileCreated($id, $name)); + + return $self; + } + + public function changeName(string $name): void + { + $this->recordThat(new NameChanged($this->id, $name)); + } + + public function promoteToAdmin(): void + { + $this->recordThat(new AdminPromoted($this->id)); + } + + #[Apply] + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId; + $this->name = $event->name; + $this->isAdmin = false; + } + + #[Apply] + protected function applyNameChanged(NameChanged $event): void + { + $this->name = $event->name; + } + + #[Apply] + protected function applyAdminPromoted(AdminPromoted $event): void + { + $this->isAdmin = true; + } + + public function name(): string + { + return $this->name; + } + + public function isAdmin(): bool + { + return $this->isAdmin; + } +} diff --git a/tests/Integration/Subscription/ProfileId.php b/tests/Integration/Subscription/ProfileId.php new file mode 100644 index 0000000..e06f8bd --- /dev/null +++ b/tests/Integration/Subscription/ProfileId.php @@ -0,0 +1,13 @@ +setupError) { + throw new RuntimeException('setup error'); + } + } + + #[Teardown] + public function teardown(): void + { + if ($this->teardownError) { + throw new RuntimeException('teardown error'); + } + } + + #[Subscribe('*')] + public function subscribe(): void + { + if ($this->subscribeError) { + throw new RuntimeException('subscribe error'); + } + } +} diff --git a/tests/Integration/Subscription/Subscriber/ErrorProducerWithSelfRecoverySubscriber.php b/tests/Integration/Subscription/Subscriber/ErrorProducerWithSelfRecoverySubscriber.php new file mode 100644 index 0000000..19a2c90 --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/ErrorProducerWithSelfRecoverySubscriber.php @@ -0,0 +1,64 @@ +setupError) { + throw new RuntimeException('setup error'); + } + } + + #[Teardown] + public function teardown(): void + { + if ($this->teardownError) { + throw new RuntimeException('teardown error'); + } + } + + #[Subscribe('*')] + public function subscribe(): void + { + if ($this->subscribeError) { + throw new RuntimeException('subscribe error'); + } + } + + #[OnFailed] + public function onFailed(Message $message, Throwable $throwable): void + { + $this->erroredMessage = $message; + $this->erroredThrowable = $throwable; + + if ($this->onFailedError) { + throw new RuntimeException('on failed error'); + } + } +} diff --git a/tests/Integration/Subscription/Subscriber/LookupSubscriber.php b/tests/Integration/Subscription/Subscriber/LookupSubscriber.php new file mode 100644 index 0000000..dae84bd --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/LookupSubscriber.php @@ -0,0 +1,83 @@ +currentStream() + ->events( + ProfileCreated::class, + NameChanged::class, + ) + ->fetchAll(); + + $state = (new Reducer()) + ->initState(['name' => null]) + ->when(ProfileCreated::class, static function (Message $message): array { + return ['name' => $message->event()->name]; + }) + ->when(NameChanged::class, static function (Message $message): array { + return ['name' => $message->event()->name]; + }) + ->reduce($messages); + + $this->connection->statement(<<tableName()} (id, name) VALUES (:id, :name); +SQL, + [ + 'id' => $event->profileId->toString(), + 'name' => $state['name'], + ], + ); + } + + #[Setup] + public function create(): void + { + $this->connection->getSchemaBuilder()->create($this->tableName(), function (Blueprint $table): void { + $table->string('id', 36); + $table->string('name', 255); + $table->primary('id'); + }); + } + + #[Teardown] + public function drop(): void + { + $this->connection->getSchemaBuilder()->drop($this->tableName()); + } + + private function tableName(): string + { + return 'projection_' . $this->subscriberId(); + } +} diff --git a/tests/Integration/Subscription/Subscriber/MigrateAggregateToStreamStoreSubscriber.php b/tests/Integration/Subscription/Subscriber/MigrateAggregateToStreamStoreSubscriber.php new file mode 100644 index 0000000..0466788 --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/MigrateAggregateToStreamStoreSubscriber.php @@ -0,0 +1,93 @@ + */ + private array $messages = []; + + /** @var list */ + private readonly array $middlewares; + + public function __construct( + private readonly StreamStore $targetStore, + private readonly Connection $connection, + ) { + $this->middlewares = [new AggregateToStreamHeaderTranslator()]; + } + + #[Subscribe('*')] + public function handle(Message $message): void + { + $this->messages[] = $message; + } + + public function beginBatch(): void + { + $this->messages = []; + } + + public function commitBatch(): void + { + $pipeline = new Pipe($this->messages, ...$this->middlewares); + $this->messages = []; + + $this->targetStore->save(...$pipeline); + } + + public function rollbackBatch(): void + { + $this->messages = []; + } + + public function forceCommit(): bool + { + return count($this->messages) >= 10_000; + } + + #[Setup] + public function setup(): void + { + $this->connection->getSchemaBuilder()->create('new_eventstore', function (Blueprint $table): void { + $table->bigInteger('id', true); + $table->string('stream', 255); + $table->integer('playhead')->nullable(); + $table->string('event_id', 255); + $table->string('event_name', 255); + $table->json('event_payload'); + $table->dateTimeTz('recorded_on'); + $table->boolean('archived')->default(false); + $table->json('custom_headers'); + + $table->unique('event_id'); + $table->unique(['stream', 'playhead']); + $table->unique(['stream', 'playhead', 'archived']); + }); + } + + #[Teardown] + public function teardown(): void + { + $this->connection->getSchemaBuilder()->drop('new_eventstore'); + } +} diff --git a/tests/Integration/Subscription/Subscriber/ProfileNewProjection.php b/tests/Integration/Subscription/Subscriber/ProfileNewProjection.php new file mode 100644 index 0000000..4d8ae9c --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/ProfileNewProjection.php @@ -0,0 +1,58 @@ +connection->getSchemaBuilder()->create($this->tableName(), function (Blueprint $table): void { + $table->string('id', 36); + $table->string('firstname', 255); + $table->primary('id'); + }); + } + + #[Teardown] + public function drop(): void + { + $this->connection->getSchemaBuilder()->drop($this->tableName()); + } + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated(ProfileCreated $profileCreated): void + { + $this->connection->statement( + 'INSERT INTO ' . $this->tableName() . ' (id, firstname) VALUES(:id, :firstname);', + [ + 'id' => $profileCreated->profileId->toString(), + 'firstname' => $profileCreated->name, + ], + ); + } + + private function tableName(): string + { + return 'projection_' . $this->subscriberId(); + } +} diff --git a/tests/Integration/Subscription/Subscriber/ProfileProcessor.php b/tests/Integration/Subscription/Subscriber/ProfileProcessor.php new file mode 100644 index 0000000..d24e24a --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/ProfileProcessor.php @@ -0,0 +1,50 @@ +repositoryManager->get(Profile::class); + + $profile = $repository->load($profileCreated->profileId); + + $profile->changeName('admin'); + + $repository->save($profile); + } + + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $nameChanged): void + { + $repository = $this->repositoryManager->get(Profile::class); + + $profile = $repository->load($nameChanged->profileId); + + if ($profile->name() !== 'admin') { + return; + } + + $profile->promoteToAdmin(); + + $repository->save($profile); + } +} diff --git a/tests/Integration/Subscription/Subscriber/ProfileProjection.php b/tests/Integration/Subscription/Subscriber/ProfileProjection.php new file mode 100644 index 0000000..e163e4e --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/ProfileProjection.php @@ -0,0 +1,79 @@ +connection->getSchemaBuilder()->create($this->tableName(), function (Blueprint $table): void { + $table->string('id', 36); + $table->string('name', 255); + $table->primary('id'); + }); + } + + #[Teardown] + public function drop(): void + { + $this->connection->getSchemaBuilder()->drop($this->tableName()); + } + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated(ProfileCreated $profileCreated): void + { + $this->connection->statement( + 'INSERT INTO ' . $this->tableName() . ' (id, name) VALUES(:id, :name);', + [ + 'id' => $profileCreated->profileId->toString(), + 'name' => $profileCreated->name, + ], + ); + } + + private function tableName(): string + { + return 'projection_' . $this->subscriberId(); + } + + public function beginBatch(): void + { + $this->connection->beginTransaction(); + } + + public function commitBatch(): void + { + $this->connection->commit(); + } + + public function rollbackBatch(): void + { + $this->connection->rollBack(); + } + + public function forceCommit(): bool + { + return false; + } +} diff --git a/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php b/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php new file mode 100644 index 0000000..5e5818f --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/ProfileProjectionWithCleanup.php @@ -0,0 +1,80 @@ +connection->getSchemaBuilder()->create($this->tableName(), function (Blueprint $table): void { + $table->string('id', 36); + $table->string('name', 255); + $table->primary('id'); + }); + } + + #[Cleanup] + public function drop(): Generator + { + yield new DropTableTask($this->tableName()); + } + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated(ProfileCreated $profileCreated): void + { + $this->connection->statement( + 'INSERT INTO ' . $this->tableName() . ' (id, name) VALUES(:id, :name);', + [ + 'id' => $profileCreated->profileId->toString(), + 'name' => $profileCreated->name, + ], + ); + } + + private function tableName(): string + { + return 'projection_' . self::TABLE_NAME; + } + + public function beginBatch(): void + { + $this->connection->beginTransaction(); + } + + public function commitBatch(): void + { + $this->connection->commit(); + } + + public function rollbackBatch(): void + { + $this->connection->rollBack(); + } + + public function forceCommit(): bool + { + return false; + } +} diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php new file mode 100644 index 0000000..c258c77 --- /dev/null +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -0,0 +1,1511 @@ +projectionConnection = DatabaseManager::createConnection(forceNewConnection: true); + } + + public function tearDown(): void + { + parent::tearDown(); + + gc_collect_cycles(); + } + + public function testHappyPath(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new IlluminateSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $subscriberRepository = new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]); + + $engine = new DefaultSubscriptionEngine( + new EventFilteredStoreMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), + $subscriptionStore, + $subscriberRepository, + ); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $result = $engine->setup(); + + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $repository->save($profile); + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $result = $this->projectionConnection->selectOne( + 'SELECT * FROM projection_profile_1 WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($profileId->toString(), $result->id); + self::assertSame('John', $result->name); + + $result = $engine->remove(); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::New, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + + self::assertFalse( + $this->projectionConnection->getSchemaBuilder()->hasTable('projection_profile_1'), + ); + } + + public function testGapResolver(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new IlluminateSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $subscriberRepository = new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]); + + $engine = new DefaultSubscriptionEngine( + new GapResolverStoreMessageLoader($store), + $subscriptionStore, + $subscriberRepository, + ); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $result = $engine->setup(); + + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $repository->save($profile); + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $result = $this->projectionConnection->selectOne( + 'SELECT * FROM projection_profile_1 WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($profileId->toString(), $result->id); + self::assertSame('John', $result->name); + + $result = $engine->remove(); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::New, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + self::assertFalse( + $this->projectionConnection->getSchemaBuilder()->hasTable('projection_profile_1'), + ); + } + + public function testErrorHandling(): void + { + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $subscriptionStore = new IlluminateSubscriptionStore( + $this->connection, + $clock, + ); + + + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $subscriber = new ErrorProducerSubscriber(); + + $engine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + new ClockBasedRetryStrategy( + $clock, + ClockBasedRetryStrategy::DEFAULT_BASE_DELAY, + ClockBasedRetryStrategy::DEFAULT_DELAY_FACTOR, + 2, + ), + ); + + $result = $engine->setup(); + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Active, $subscription->status()); + self::assertEquals(null, $subscription->subscriptionError()); + self::assertEquals(0, $subscription->retryAttempt()); + + $repository = $manager->get(Profile::class); + + $profile = Profile::create(ProfileId::generate(), 'John'); + $repository->save($profile); + + $subscriber->subscribeError = true; + + // first run, error + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Error, $subscription->status()); + self::assertEquals('subscribe error', $subscription->subscriptionError()?->errorMessage); + self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus); + self::assertEquals(0, $subscription->retryAttempt()); + + // second run, time has not passed yet, no retry, no error + + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Error, $subscription->status()); + self::assertEquals('subscribe error', $subscription->subscriptionError()?->errorMessage); + self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus); + self::assertEquals(0, $subscription->retryAttempt()); + + // third run, time has passed, 1. retry, error again + + $clock->sleep(5); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Error, $subscription->status()); + self::assertEquals('subscribe error', $subscription->subscriptionError()?->errorMessage); + self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus); + self::assertEquals(1, $subscription->retryAttempt()); + + // fourth run, time has passed, 2. retry, max retries reached, failed + + $clock->sleep(10); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Failed, $subscription->status()); + self::assertEquals('subscribe error', $subscription->subscriptionError()?->errorMessage); + self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus); + self::assertEquals(2, $subscription->retryAttempt()); + + // fifth run, time has passed, skip failed subscription + + $clock->sleep(20); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Failed, $subscription->status()); + self::assertEquals('subscribe error', $subscription->subscriptionError()?->errorMessage); + self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus); + self::assertEquals(2, $subscription->retryAttempt()); + + // reactivated subscription + + $engine->reactivate(new SubscriptionEngineCriteria( + ids: ['error_producer'], + )); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Active, $subscription->status()); + self::assertEquals(null, $subscription->subscriptionError()); + self::assertEquals(0, $subscription->retryAttempt()); + + // sixth run, error again + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Error, $subscription->status()); + self::assertEquals('subscribe error', $subscription->subscriptionError()?->errorMessage); + self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus); + self::assertEquals(0, $subscription->retryAttempt()); + + // seventh run, time has passed, error fixed, 1. retry, no error + + $clock->sleep(5); + $subscriber->subscribeError = false; + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Active, $subscription->status()); + self::assertEquals(null, $subscription->subscriptionError()); + self::assertEquals(0, $subscription->retryAttempt()); + } + + public function testSelfRecovery(): void + { + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $subscriptionStore = new IlluminateSubscriptionStore( + $this->connection, + $clock, + ); + + + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $subscriber = new ErrorProducerWithSelfRecoverySubscriber(); + + $engine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + new ClockBasedRetryStrategy( + $clock, + ClockBasedRetryStrategy::DEFAULT_BASE_DELAY, + ClockBasedRetryStrategy::DEFAULT_DELAY_FACTOR, + 0, + ), + ); + + $result = $engine->setup(skipBooting: true); + self::assertEquals([], $result->errors); + + // add data + + $repository = $manager->get(Profile::class); + + $profile = Profile::create(ProfileId::generate(), 'John'); + $repository->save($profile); + + $subscriber->subscribeError = true; + + // first run, failed -> self recovery + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Active, $subscription->status()); + self::assertEquals(0, $subscription->retryAttempt()); + self::assertEquals(1, $subscription->position()); + + // change data + + $profile->changeName('Jane'); + $repository->save($profile); + + // second run, failed -> self recovery failed + + $subscriber->onFailedError = true; + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Failed, $subscription->status()); + self::assertEquals(0, $subscription->retryAttempt()); + self::assertEquals(1, $subscription->position()); + } + + public function testLargeErrorMessage(): void + { + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $subscriptionStore = new IlluminateSubscriptionStore( + $this->connection, + $clock, + ); + + + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $subscriber = new #[Subscriber('error_producer', RunMode::FromBeginning)] + class { + public bool $subscribeError = false; + + #[Setup] + public function setup(): void + { + } + + #[Teardown] + public function teardown(): void + { + } + + #[Subscribe('*')] + public function subscribe(): void + { + if ($this->subscribeError) { + throw new RuntimeException('subscribe error: as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration.'); + } + } + }; + + $engine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + new ClockBasedRetryStrategy( + $clock, + ClockBasedRetryStrategy::DEFAULT_BASE_DELAY, + ClockBasedRetryStrategy::DEFAULT_DELAY_FACTOR, + 2, + ), + ); + + $result = $engine->setup(); + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Active, $subscription->status()); + self::assertEquals(null, $subscription->subscriptionError()); + self::assertEquals(0, $subscription->retryAttempt()); + + $repository = $manager->get(Profile::class); + + $profile = Profile::create(ProfileId::generate(), 'John'); + $repository->save($profile); + + $subscriber->subscribeError = true; + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals( + 'subscribe error: as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration.', + $error->message, + ); + + $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); + + self::assertEquals(Status::Error, $subscription->status()); + self::assertEquals( + 'subscribe error: as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration, as an extra long message exceeding 255 varchar configuration.', + $subscription->subscriptionError()?->errorMessage, + ); + self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus); + self::assertEquals(0, $subscription->retryAttempt()); + } + + public function testProcessor(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + $subscriptionStore = new IlluminateSubscriptionStore($this->connection, $clock); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + null, + null, + ); + + $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository([new ProfileProcessor($manager)]); + + $repository = $manager->get(Profile::class); + + $engine = new CatchUpSubscriptionEngine( + new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + $subscriberAccessorRepository, + ), + ); + + self::assertEquals( + [ + new Subscription( + 'profile', + 'processor', + RunMode::FromNow, + Status::Active, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $profile = Profile::create(ProfileId::generate(), 'John'); + $repository->save($profile); + + $engine->run(); + + $subscriptions = $engine->subscriptions(); + + self::assertCount(1, $subscriptions); + self::assertArrayHasKey(0, $subscriptions); + + $subscription = $subscriptions[0]; + + self::assertEquals('profile', $subscription->id()); + + self::assertEquals(Status::Active, $subscription->status()); + + /** @var list $messages */ + $messages = iterator_to_array($store->load()); + + self::assertCount(3, $messages); + self::assertArrayHasKey(2, $messages); + } + + public function testBlueGreenDeployment(): void + { + // Test Setup + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + $subscriptionStore = new IlluminateSubscriptionStore($this->connection, $clock); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $firstEngine = new ThrowOnErrorSubscriptionEngine(new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]), + )); + + // Deploy first version + + $firstEngine->setup(); + $firstEngine->boot(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + + // Run first version + + $profile = Profile::create(ProfileId::generate(), 'John'); + $repository->save($profile); + + $firstEngine->run(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + + // deploy second version + + $secondEngine = new ThrowOnErrorSubscriptionEngine(new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new ProfileNewProjection($this->projectionConnection)]), + )); + + $secondEngine->setup(); + $secondEngine->boot(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + + // switch traffic + + $secondEngine->run(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Detached, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $secondEngine->subscriptions(), + ); + + // shutdown first version + + $firstEngine->teardown(); + + self::assertEquals( + [ + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $secondEngine->subscriptions(), + ); + } + + public function testBlueGreenDeploymentRollback(): void + { + // Test Setup + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new IlluminateSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + + + $firstEngine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]), + ); + + // Deploy first version + + $firstEngine->setup(); + $firstEngine->boot(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + + // Run first version + + $profile = Profile::create(ProfileId::generate(), 'John'); + $repository->save($profile); + + $firstEngine->run(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + + // deploy second version + + $secondEngine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new ProfileNewProjection($this->projectionConnection)]), + ); + + $secondEngine->setup(); + $secondEngine->boot(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + + // switch traffic + + $secondEngine->run(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Detached, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $secondEngine->subscriptions(), + ); + + // rollback + + $firstEngine->setup(); + $firstEngine->boot(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Detached, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + + // reactivating detached subscription + + $firstEngine->reactivate(new SubscriptionEngineCriteria( + ids: ['profile_1'], + )); + + // switch traffic + + $firstEngine->run(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Detached, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + + // shutdown second version + + $secondEngine->teardown(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + } + + public function testCleanup(): void + { + // Test Setup + + $cleaner = new DefaultCleaner([ + new IlluminateCleanupTaskHandler( + $this->projectionConnection, + ), + ]); + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new IlluminateSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + + + $firstEngine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new ProfileProjectionWithCleanup($this->projectionConnection)]), + cleaner: $cleaner, + ); + + // Deploy first version + + $firstEngine->setup(); + $firstEngine->boot(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + cleanupTasks: [new DropTableTask('projection_profile_1')], + ), + ], + $firstEngine->subscriptions(), + ); + + // Run first version + + $profile = Profile::create(ProfileId::generate(), 'John'); + $repository->save($profile); + + $firstEngine->run(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + cleanupTasks: [new DropTableTask('projection_profile_1')], + ), + ], + $firstEngine->subscriptions(), + ); + + // deploy second version + + $secondEngine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new ProfileNewProjection($this->projectionConnection)]), + cleaner: $cleaner, + ); + + $secondEngine->setup(); + $secondEngine->boot(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + cleanupTasks: [new DropTableTask('projection_profile_1')], + ), + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $firstEngine->subscriptions(), + ); + + // switch traffic + + $secondEngine->run(); + + self::assertEquals( + [ + new Subscription( + 'profile_1', + 'projector', + RunMode::FromBeginning, + Status::Detached, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + cleanupTasks: [new DropTableTask('projection_profile_1')], + ), + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $secondEngine->subscriptions(), + ); + + // shutdown second version (with cleanup) + + $secondEngine->teardown(); + + self::assertEquals( + [ + new Subscription( + 'profile_2', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $secondEngine->subscriptions(), + ); + + self::assertFalse( + $this->projectionConnection->getSchemaBuilder()->hasTable('projection_profile_1'), + ); + } + + public function testPipeline(): void + { + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $targetStore = new StreamIlluminateStore( + $this->projectionConnection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + config: ['table_name' => 'new_eventstore'], + ); + + $subscriptionStore = new IlluminateSubscriptionStore($this->connection, $clock); + + $manager = new DefaultRepositoryManager(new AggregateRootRegistry(['profile' => Profile::class]), $store); + $repository = $manager->get(Profile::class); + + $engine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new MigrateAggregateToStreamStoreSubscriber($targetStore, $this->projectionConnection)]), + ); + + self::assertEquals( + [ + new Subscription( + 'migrate', + 'default', + RunMode::Once, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $result = $engine->setup(); + + self::assertEquals([], $result->errors); + + self::assertTrue($this->projectionConnection->getSchemaBuilder()->hasTable('new_eventstore')); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + + for ($i = 1; $i < 1_000; $i++) { + $profile->changeName(sprintf('John %d', $i)); + } + + $repository->save($profile); + + $result = $engine->boot(); + + self::assertEquals(1_000, $result->processedMessages); + + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'migrate', + 'default', + RunMode::Once, + Status::Finished, + 1_000, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + // target store check + + $result = $engine->remove(); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'migrate', + 'default', + RunMode::Once, + Status::New, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + self::assertFalse( + $this->projectionConnection->getSchemaBuilder()->hasTable('new_eventstore'), + ); + } + + public function testLookup(): void + { + $eventRegistry = (new AttributeEventRegistryFactory())->create([__DIR__ . '/Events']); + $serializer = new DefaultEventSerializer($eventRegistry); + + $store = new StreamIlluminateStore($this->connection, $serializer); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new IlluminateSubscriptionStore($this->connection, $clock); + $manager = new DefaultRepositoryManager(new AggregateRootRegistry(['profile' => Profile::class]), $store); + + $repository = $manager->get(Profile::class); + + $subscriberRepository = new MetadataSubscriberAccessorRepository( + [ + new LookupSubscriber($this->projectionConnection), + ], + argumentResolvers: [ + new LookupResolver( + $store, + $eventRegistry, + ), + ], + ); + + $engine = new DefaultSubscriptionEngine( + new StoreMessageLoader($store), + $subscriptionStore, + $subscriberRepository, + ); + + $result = $engine->setup(); + + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $repository->save($profile); + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); + + $result = $this->projectionConnection->selectOne( + 'SELECT * FROM projection_lookup WHERE id = ?', + [$profileId->toString()], + ); + + self::assertNull($result); + + $profile->changeName('Hans'); + $profile->promoteToAdmin(); + $repository->save($profile); + + $result = $engine->run(); + + self::assertEquals(2, $result->processedMessages); + self::assertEquals([], $result->errors); + + $result = $this->projectionConnection->selectOne( + 'SELECT * FROM projection_lookup WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsObject($result); + self::assertObjectHasProperty('id', $result); + self::assertSame($profileId->toString(), $result->id); + self::assertSame('Hans', $result->name); + } + + public function testRefreshSubscriptions(): void + { + $store = new StreamIlluminateStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new IlluminateSubscriptionStore( + $this->connection, + $clock, + ); + + + + $subscriber = new #[Subscriber('test', RunMode::FromBeginning, group: 'default')] + class { + }; + + $subscriberRepository = new MetadataSubscriberAccessorRepository([$subscriber]); + + $engine = new DefaultSubscriptionEngine( + $this->createMock(MessageLoader::class), + $subscriptionStore, + $subscriberRepository, + ); + + $engine->setup(); + + $subscriptions = $engine->subscriptions(); + self::assertCount(1, $subscriptions); + self::assertEquals('test', $subscriptions[0]->id()); + self::assertEquals('default', $subscriptions[0]->group()); + self::assertEquals(RunMode::FromBeginning, $subscriptions[0]->runMode()); + + // change subscriber metadata + $newSubscriber = new #[Subscriber('test', RunMode::FromNow, group: 'new-group')] + class { + }; + + $newSubscriberRepository = new MetadataSubscriberAccessorRepository([$newSubscriber]); + + $engine = new DefaultSubscriptionEngine( + $this->createMock(MessageLoader::class), + $subscriptionStore, + $newSubscriberRepository, + ); + + $engine->refresh(); + + $subscriptions = $engine->subscriptions(); + self::assertCount(1, $subscriptions); + self::assertEquals('test', $subscriptions[0]->id()); + self::assertEquals('new-group', $subscriptions[0]->group()); + self::assertEquals(RunMode::FromNow, $subscriptions[0]->runMode()); + } + + /** @param list $subscriptions */ + private static function findSubscription(array $subscriptions, string $id): Subscription + { + foreach ($subscriptions as $subscription) { + if ($subscription->id() === $id) { + return $subscription; + } + } + + self::fail('subscription not found'); + } +}