diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml
new file mode 100644
index 0000000..9344198
--- /dev/null
+++ b/.github/workflows/integration.yml
@@ -0,0 +1,216 @@
+# https://help.github.com/en/categories/automating-your-workflow-with-github-actions
+
+name: "Integration tests"
+
+on:
+ pull_request:
+ push:
+ branches:
+ - "[0-9]+.[0-9]+.x"
+ - "renovate/*"
+
+jobs:
+ postgres:
+ name: "Postgres"
+
+ runs-on: ${{ matrix.operating-system }}
+
+ services:
+ postgres:
+ # Docker Hub image
+ image: "postgres:${{ matrix.postgres-version }}"
+ # Provide the password for postgres
+ env:
+ POSTGRES_PASSWORD: postgres
+ POSTGRES_DB: event_store
+ options: >-
+ --health-cmd "pg_isready"
+ ports:
+ - "5432:5432"
+
+ strategy:
+ matrix:
+ dependencies:
+ - "locked"
+ php-version:
+ - "8.5"
+ operating-system:
+ - "ubuntu-latest"
+ postgres-version:
+ - "14.20"
+ - "15.15"
+ - "16.11"
+ - "17.7"
+ - "18.1"
+
+ env:
+ DB_URL: 'pgsql://postgres:postgres@localhost:5432/event_store?charset=utf8'
+ DB_CONNECTION: 'pgsql'
+ DB_DATABASE: 'event_store'
+
+ steps:
+ - name: "Checkout"
+ uses: actions/checkout@v6
+
+ - name: "Install PHP"
+ uses: "shivammathur/setup-php@2.37.0"
+ with:
+ coverage: "pcov"
+ php-version: "${{ matrix.php-version }}"
+ ini-values: memory_limit=-1
+ extensions: pdo_pgsql
+
+ - uses: ramsey/composer-install@4.0.0
+ with:
+ dependency-versions: ${{ matrix.dependencies }}
+ composer-options: ${{ matrix.composer-options }}
+
+ - name: "Tests"
+ run: "vendor/bin/phpunit --testsuite=integration --no-coverage"
+
+ mariadb:
+ name: "mariadb"
+
+ runs-on: ${{ matrix.operating-system }}
+
+ services:
+ mariadb:
+ image: "mariadb:${{ matrix.mariadb-version }}"
+ env:
+ MYSQL_ALLOW_EMPTY_PASSWORD: yes
+ MYSQL_DATABASE: event_store
+
+ options: >-
+ --health-cmd "mariadb-admin ping --silent"
+ ports:
+ - "3306:3306"
+
+ strategy:
+ matrix:
+ dependencies:
+ - "locked"
+ php-version:
+ - "8.5"
+ operating-system:
+ - "ubuntu-latest"
+ mariadb-version:
+ - "10.6"
+ - "10.11"
+ - "11.4"
+ - "11.8"
+ - "12.1"
+
+ env:
+ DB_URL: 'mysql://root@127.0.0.1:3306/event_store?charset=utf8'
+ DB_CONNECTION: 'mysql'
+ DB_DATABASE: 'event_store'
+
+ steps:
+ - name: "Checkout"
+ uses: actions/checkout@v6
+
+ - name: "Install PHP"
+ uses: "shivammathur/setup-php@2.37.0"
+ with:
+ coverage: "pcov"
+ php-version: "${{ matrix.php-version }}"
+ ini-values: memory_limit=-1
+ extensions: pdo_mysql
+
+ - uses: ramsey/composer-install@4.0.0
+ with:
+ dependency-versions: ${{ matrix.dependencies }}
+ composer-options: ${{ matrix.composer-options }}
+
+ - name: "Tests"
+ run: "vendor/bin/phpunit --testsuite=integration --no-coverage"
+
+ mysql:
+ name: "mysql"
+
+ runs-on: ${{ matrix.operating-system }}
+
+ services:
+ mysql:
+ image: "mysql:${{ matrix.mysql-version }}"
+
+ env:
+ MYSQL_ALLOW_EMPTY_PASSWORD: yes
+ MYSQL_DATABASE: "event_store"
+
+ options: >-
+ --health-cmd "mysqladmin ping --silent"
+ ports:
+ - "3306:3306"
+
+ strategy:
+ matrix:
+ dependencies:
+ - "locked"
+ php-version:
+ - "8.5"
+ operating-system:
+ - "ubuntu-latest"
+ mysql-version:
+ - "8.0"
+ - "8.4"
+ - "9.5"
+
+ env:
+ DB_URL: 'mysql://root@127.0.0.1:3306/event_store?charset=utf8'
+ DB_CONNECTION: 'mysql'
+ DB_DATABASE: 'event_store'
+
+ steps:
+ - name: "Checkout"
+ uses: actions/checkout@v6
+
+ - name: "Install PHP"
+ uses: "shivammathur/setup-php@2.37.0"
+ with:
+ coverage: "pcov"
+ php-version: "${{ matrix.php-version }}"
+ ini-values: memory_limit=-1
+ extensions: pdo_mysql
+
+ - uses: ramsey/composer-install@4.0.0
+ with:
+ dependency-versions: ${{ matrix.dependencies }}
+ composer-options: ${{ matrix.composer-options }}
+
+ - name: "Tests"
+ run: "vendor/bin/phpunit --testsuite=integration --no-coverage"
+
+ sqlite:
+ name: "Sqlite"
+
+ runs-on: ${{ matrix.operating-system }}
+
+ strategy:
+ matrix:
+ dependencies:
+ - "locked"
+ php-version:
+ - "8.5"
+ operating-system:
+ - "ubuntu-latest"
+
+ steps:
+ - name: "Checkout"
+ uses: actions/checkout@v6
+
+ - name: "Install PHP"
+ uses: "shivammathur/setup-php@2.37.0"
+ with:
+ coverage: "pcov"
+ php-version: "${{ matrix.php-version }}"
+ ini-values: memory_limit=-1
+ extensions: pdo_sqlite
+
+ - uses: ramsey/composer-install@4.0.0
+ with:
+ dependency-versions: ${{ matrix.dependencies }}
+ composer-options: ${{ matrix.composer-options }}
+
+ - name: "Tests"
+ run: "vendor/bin/phpunit --testsuite=integration --no-coverage"
diff --git a/.github/workflows/phpunit.yml b/.github/workflows/unit.yml
similarity index 92%
rename from .github/workflows/phpunit.yml
rename to .github/workflows/unit.yml
index dc078e6..538977e 100644
--- a/.github/workflows/phpunit.yml
+++ b/.github/workflows/unit.yml
@@ -51,4 +51,4 @@ jobs:
dependency-versions: ${{ matrix.dependencies }}
- name: "Tests"
- run: "vendor/bin/phpunit --coverage-clover=clover.xml --coverage-text"
+ run: "vendor/bin/phpunit --testsuite=unit --coverage-clover=clover.xml --coverage-text"
diff --git a/Makefile b/Makefile
index 8106244..2d69f3b 100644
--- a/Makefile
+++ b/Makefile
@@ -22,7 +22,8 @@ phpstan-baseline: vendor
.PHONY: phpunit
phpunit: vendor ## run phpunit tests
- XDEBUG_MODE=coverage vendor/bin/phpunit
+ XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite=unit
+ XDEBUG_MODE=off vendor/bin/phpunit --testsuite=integration --no-coverage
.PHONY: infection
infection: vendor ## run infection
diff --git a/compose.yaml b/compose.yaml
new file mode 100644
index 0000000..9eb1711
--- /dev/null
+++ b/compose.yaml
@@ -0,0 +1,14 @@
+services:
+ postgres:
+ image: postgres:16-alpine
+ environment:
+ POSTGRES_DB: event_store
+ POSTGRES_USER: postgres
+ POSTGRES_PASSWORD: postgres
+ ports:
+ - "5432:5432"
+ healthcheck:
+ test: ["CMD-SHELL", "pg_isready -U postgres -d eventstore"]
+ interval: 5s
+ timeout: 5s
+ retries: 5
diff --git a/composer.json b/composer.json
index 9f18013..e37d006 100644
--- a/composer.json
+++ b/composer.json
@@ -20,7 +20,7 @@
"require": {
"php": "~8.3.0 || ~8.4.0 || ~8.5.0",
"illuminate/contracts": "^11.0 || ^12.0 || ^13.0",
- "patchlevel/event-sourcing": "^3.15.0"
+ "patchlevel/event-sourcing": "^3.16.0"
},
"require-dev": {
"ext-pdo_sqlite": "*",
diff --git a/composer.lock b/composer.lock
index d28499c..078807f 100644
--- a/composer.lock
+++ b/composer.lock
@@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
- "content-hash": "5a9aa63bf924f6f7049a443ed3f64807",
+ "content-hash": "0dcd18037fb69f46a92629e34fd2c50b",
"packages": [
{
"name": "brick/math",
diff --git a/database/migrations/create_eventsourcing_tables.php b/database/migrations/create_eventsourcing_tables.php
index 29ad7c8..e777191 100644
--- a/database/migrations/create_eventsourcing_tables.php
+++ b/database/migrations/create_eventsourcing_tables.php
@@ -8,38 +8,40 @@
{
public function up(): void
{
- Schema::create('eventstore', function (Blueprint $table) {
- $table->bigIncrements('id');
- $table->string('aggregate', 255);
- $table->char('aggregate_id', 36);
- $table->integer('playhead');
- $table->string('event', 255);
- $table->json('payload');
- $table->dateTime('recorded_on');
- $table->tinyInteger('new_stream_start')->default(0);
- $table->tinyInteger('archived')->default(0);
+ Schema::create('event_store', function (Blueprint $table) {
+ $table->bigInteger('id', true);
+ $table->string('stream', 255);
+ $table->integer('playhead')->nullable();
+ $table->string('event_id', 255);
+ $table->string('event_name', 255);
+ $table->json('event_payload');
+ $table->dateTimeTz('recorded_on');
+ $table->boolean('archived')->default(false);
$table->json('custom_headers');
- $table->unique(['aggregate', 'aggregate_id', 'playhead']);
- $table->index(['aggregate', 'aggregate_id', 'playhead', 'archived']);
+
+ $table->unique('event_id');
+ $table->unique(['stream', 'playhead']);
+ $table->unique(['stream', 'playhead', 'archived']);
});
Schema::create('subscriptions', function (Blueprint $table) {
$table->string('id', 255);
$table->string('group_name', 32);
$table->string('run_mode', 16);
- $table->integer('position');
$table->string('status', 32);
+ $table->integer('position');
$table->longText('error_message')->nullable();
$table->string('error_previous_status', 32)->nullable();
$table->json('error_context')->nullable();
$table->integer('retry_attempt');
$table->dateTime('last_saved_at');
+ $table->text('cleanup_tasks')->nullable();
$table->index('group_name');
$table->index('status');
$table->primary('id');
});
- Schema::create('eventstore_cipher_keys', function (Blueprint $table) {
+ Schema::create('crypto_keys', function (Blueprint $table) {
$table->string('subject_id', 255);
$table->string('crypto_key', 255);
$table->string('crypto_method', 255);
@@ -50,8 +52,8 @@ public function up(): void
public function down(): void
{
- Schema::dropIfExists('eventstore');
+ Schema::dropIfExists('event_store');
Schema::dropIfExists('subscriptions');
- Schema::dropIfExists('eventstore_cipher_keys');
+ Schema::dropIfExists('crypto_keys');
}
};
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index 110f209..22329a1 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -11,6 +11,9 @@
tests/Unit
+
+ tests/Integration
+
@@ -22,6 +25,7 @@
+
diff --git a/src/Cryptography/IlluminateCipherKeyStore.php b/src/Cryptography/IlluminateCipherKeyStore.php
new file mode 100644
index 0000000..238ff20
--- /dev/null
+++ b/src/Cryptography/IlluminateCipherKeyStore.php
@@ -0,0 +1,96 @@
+ */
+ private array $keyCache = [];
+
+ public function __construct(
+ private readonly Connection $connection,
+ private readonly string $tableName = 'crypto_keys',
+ ) {
+ }
+
+ public function get(string $id): CipherKey
+ {
+ if (array_key_exists($id, $this->keyCache)) {
+ return $this->keyCache[$id];
+ }
+
+ /** @var Row|null $result */
+ $result = $this->connection->selectOne(
+ "SELECT * FROM {$this->tableName} WHERE subject_id = :subject_id",
+ ['subject_id' => $id],
+ );
+
+ if ($result === null) {
+ throw new CipherKeyNotExists($id);
+ }
+
+ $this->keyCache[$id] = new CipherKey(
+ base64_decode($result->crypto_key),
+ $result->crypto_method,
+ base64_decode($result->crypto_iv),
+ );
+
+ return $this->keyCache[$id];
+ }
+
+ public function store(string $id, CipherKey $key): void
+ {
+ $this->connection->statement(
+ <<tableName}
+ (subject_id, crypto_key, crypto_method, crypto_iv)
+ VALUES
+ (:subject_id, :crypto_key, :crypto_method, :crypto_iv)
+SQL,
+ [
+ 'subject_id' => $id,
+ 'crypto_key' => base64_encode($key->key),
+ 'crypto_method' => $key->method,
+ 'crypto_iv' => base64_encode($key->iv),
+ ],
+ );
+
+ $this->keyCache[$id] = $key;
+ }
+
+ public function remove(string $id): void
+ {
+ $this->connection->statement(
+ <<tableName} WHERE subject_id = :subject_id
+SQL,
+ ['subject_id' => $id],
+ );
+
+ unset($this->keyCache[$id]);
+ }
+
+ public function clear(): void
+ {
+ $this->keyCache = [];
+ }
+}
diff --git a/src/Store/StreamIlluminateStore.php b/src/Store/StreamIlluminateStore.php
new file mode 100644
index 0000000..3afbb1d
--- /dev/null
+++ b/src/Store/StreamIlluminateStore.php
@@ -0,0 +1,480 @@
+headersSerializer = $headersSerializer ?? DefaultHeadersSerializer::createDefault();
+ $this->clock = $clock ?? new SystemClock();
+
+ $this->config = [
+ 'table_name' => 'event_store',
+ 'locking' => true,
+ 'lock_id' => self::DEFAULT_LOCK_ID,
+ 'lock_timeout' => -1,
+ 'keep_index' => false,
+ ...$config,
+ ];
+ }
+
+ public function load(
+ Criteria|null $criteria = null,
+ int|null $limit = null,
+ int|null $offset = null,
+ bool $backwards = false,
+ ): Stream {
+ $builder = $this->connection
+ ->table($this->config['table_name'])
+ ->select('*')
+ ->orderBy('id', $backwards ? 'desc' : 'asc');
+
+ $this->applyCriteria($builder, $criteria ?? new Criteria());
+
+ if ($limit !== null) {
+ $builder->limit($limit);
+ }
+
+ if ($offset !== null) {
+ $builder->offset($offset);
+ }
+
+ return new StreamIlluminateStoreStream(
+ $builder->cursor(),
+ $this->eventSerializer,
+ $this->headersSerializer,
+ );
+ }
+
+ public function count(Criteria|null $criteria = null): int
+ {
+ $builder = $this->connection
+ ->table($this->config['table_name']);
+
+ $this->applyCriteria($builder, $criteria ?? new Criteria());
+
+ return $builder->count();
+ }
+
+ public function save(Message ...$messages): void
+ {
+ if ($messages === []) {
+ return;
+ }
+
+ $this->transactional(function () use ($messages): void {
+ $columnsLength = $this->config['keep_index'] ? 9 : 8;
+ $batchSize = (int)floor(self::MAX_UNSIGNED_SMALL_INT / $columnsLength);
+
+ $rows = [];
+ foreach ($messages as $message) {
+ $data = $this->eventSerializer->serialize($message->event());
+
+ try {
+ $streamName = $message->header(StreamNameHeader::class)->streamName;
+ } catch (HeaderNotFound $e) {
+ throw new MissingDataForStorage($e->name, $e);
+ }
+
+ $eventId = $message->hasHeader(EventIdHeader::class)
+ ? $message->header(EventIdHeader::class)->eventId
+ : Uuid::uuid7()->toString();
+
+ $row = [
+ 'stream' => $streamName,
+ 'playhead' => $message->hasHeader(PlayheadHeader::class)
+ ? $message->header(PlayheadHeader::class)->playhead
+ : null,
+ 'event_id' => $eventId,
+ 'event_name' => $data->name,
+ 'event_payload' => $data->payload,
+ 'recorded_on' => $message->hasHeader(RecordedOnHeader::class)
+ ? $message->header(RecordedOnHeader::class)->recordedOn
+ : $this->clock->now(),
+ 'archived' => $message->hasHeader(ArchivedHeader::class),
+ 'custom_headers' => $this->headersSerializer->serialize($this->getCustomHeaders($message)),
+ ];
+
+ if ($this->config['keep_index']) {
+ try {
+ $row['id'] = $message->header(IndexHeader::class)->index;
+ } catch (HeaderNotFound $e) {
+ throw new MissingDataForStorage($e->name, $e);
+ }
+ }
+
+ $rows[] = $row;
+
+ if (count($rows) !== $batchSize) {
+ continue;
+ }
+
+ $this->executeSave($rows);
+ $rows = [];
+ }
+
+ if ($rows !== []) {
+ $this->executeSave($rows);
+ }
+
+ if (!$this->config['keep_index'] || $this->driverName() !== 'pgsql') {
+ return;
+ }
+
+ $this->connection->statement(sprintf(
+ "SELECT setval('%s', (SELECT MAX(id) FROM %s));",
+ sprintf('%s_id_seq', $this->config['table_name']),
+ $this->config['table_name'],
+ ));
+ });
+ }
+
+ public function transactional(Closure $function): void
+ {
+ if ($this->hasLock || !$this->config['locking']) {
+ $this->connection->transaction($function);
+
+ return;
+ }
+
+ $this->connection->transaction(function () use ($function): void {
+ $this->lock();
+
+ try {
+ $function();
+ } finally {
+ $this->unlock();
+ }
+ });
+ }
+
+ /** @return list */
+ public function streams(): array
+ {
+ /** @var list $streams */
+ $streams = $this->connection
+ ->table($this->config['table_name'])
+ ->select('stream')
+ ->distinct()
+ ->orderBy('stream')
+ ->pluck('stream')
+ ->all();
+
+ return $streams;
+ }
+
+ public function remove(Criteria|null $criteria = null): void
+ {
+ $builder = $this->connection->table($this->config['table_name']);
+
+ $this->applyCriteria($builder, $criteria ?? new Criteria());
+
+ $builder->delete();
+ }
+
+ public function archive(Criteria|null $criteria = null): void
+ {
+ $builder = $this->connection->table($this->config['table_name']);
+
+ $this->applyCriteria($builder, $criteria ?? new Criteria());
+
+ $builder->update(['archived' => true]);
+ }
+
+ public function supportSubscription(): bool
+ {
+ return $this->driverName() === 'pgsql'
+ && class_exists(PDO::class)
+ && method_exists($this->connection, 'getPdo');
+ }
+
+ public function setupSubscription(): void
+ {
+ if (!$this->supportSubscription()) {
+ return;
+ }
+
+ $functionName = $this->createTriggerFunctionName();
+
+ $this->connection->statement(sprintf(
+ <<<'SQL'
+ CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$
+ BEGIN
+ PERFORM pg_notify('%2$s', NEW.stream::text);
+ RETURN NEW;
+ END;
+ $$ LANGUAGE plpgsql;
+ SQL,
+ $functionName,
+ $this->config['table_name'],
+ ));
+
+ $this->connection->statement(sprintf(
+ 'DROP TRIGGER IF EXISTS notify_trigger ON %s;',
+ $this->config['table_name'],
+ ));
+
+ $this->connection->statement(sprintf(
+ 'CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();',
+ $this->config['table_name'],
+ $functionName,
+ ));
+ }
+
+ public function wait(int $timeoutMilliseconds): void
+ {
+ if (!$this->supportSubscription()) {
+ return;
+ }
+
+ $this->connection->statement(sprintf('LISTEN "%s"', $this->config['table_name']));
+
+ if (!$this->connection instanceof Connection) {
+ return;
+ }
+
+ $this->connection->getPdo()->pgsqlGetNotify(PDO::FETCH_ASSOC, $timeoutMilliseconds);
+ }
+
+ /** @return list