From 2bb319193ad3d287f19acd2ffaf7780cac9a3807 Mon Sep 17 00:00:00 2001 From: Micilini Roll Date: Sat, 9 May 2026 18:51:39 -0300 Subject: [PATCH] phase 09: add storage adapters and migrations --- .gitignore | 5 +- README.md | 28 +++ composer.json | 3 +- examples/private-chat/README.md | 6 + src/Database/MigrationRunner.php | 45 ++++ src/Database/Schema/mysql.sql | 59 +++++ src/Database/Schema/pgsql.sql | 64 ++++++ src/Database/Schema/sqlite.sql | 71 ++++++ src/Exceptions/StorageException.php | 11 + src/Storage/File/FileMessageStore.php | 109 +++++++++ src/Storage/Pdo/PdoConnectionFactory.php | 38 ++++ src/Storage/Pdo/PdoMessageStore.php | 101 +++++++++ src/Storage/Pdo/PdoRoomStore.php | 213 ++++++++++++++++++ src/Storage/Pdo/PdoSessionStore.php | 204 +++++++++++++++++ tests/Integration/Storage/SqliteStoreTest.php | 115 ++++++++++ tests/Unit/Storage/InMemoryStoreTest.php | 79 +++++++ 16 files changed, 1149 insertions(+), 2 deletions(-) create mode 100644 src/Database/MigrationRunner.php create mode 100644 src/Database/Schema/mysql.sql create mode 100644 src/Database/Schema/pgsql.sql create mode 100644 src/Database/Schema/sqlite.sql create mode 100644 src/Exceptions/StorageException.php create mode 100644 src/Storage/File/FileMessageStore.php create mode 100644 src/Storage/Pdo/PdoConnectionFactory.php create mode 100644 src/Storage/Pdo/PdoMessageStore.php create mode 100644 src/Storage/Pdo/PdoRoomStore.php create mode 100644 src/Storage/Pdo/PdoSessionStore.php create mode 100644 tests/Integration/Storage/SqliteStoreTest.php create mode 100644 tests/Unit/Storage/InMemoryStoreTest.php diff --git a/.gitignore b/.gitignore index 776c70d..9900982 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,9 @@ desktop.ini /storage/*.sqlite /storage/*.sqlite3 /storage/*.db +/examples/**/storage/*.sqlite +/examples/**/storage/*.sqlite3 +/examples/**/storage/*.db node_modules/ npm-debug.log* @@ -39,4 +42,4 @@ yarn-debug.log* yarn-error.log* pnpm-debug.log* -/.phpunit.cache \ No newline at end of file +/.phpunit.cache diff --git a/README.md b/README.md index fc2e902..8645ac0 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,33 @@ http://127.0.0.1:8002 PrivateChat demonstrates global chat, direct 1:1 conversations, private group rooms, unread badges, typing indicators and simple message receipts. +## Optional storage adapters + +PHPSockets currently uses in-memory storage by default for the examples. + +The package also includes optional storage adapters: + +```txt +InMemory +File JSONL messages +PDO SQLite +PDO MySQL +PDO PostgreSQL +``` + +SQLite can be initialized programmatically with the migration runner: + +```php +use Micilini\PhpSockets\Database\MigrationRunner; +use Micilini\PhpSockets\Storage\Pdo\PdoConnectionFactory; + +$pdo = PdoConnectionFactory::sqlite(__DIR__ . '/storage/phpsockets.sqlite'); + +(new MigrationRunner($pdo))->run('sqlite'); +``` + +The CLI migration command will be added in a future phase. + ## Requirements The modern version targets: @@ -136,6 +163,7 @@ The modern version targets: Optional future features may require: - `ext-pdo` for SQL storage adapters. +- `ext-pdo_sqlite` for SQLite storage tests and local persistence. - Laravel packages for optional Laravel integration. ## Namespace diff --git a/composer.json b/composer.json index 0dbc102..5f059d9 100644 --- a/composer.json +++ b/composer.json @@ -23,6 +23,7 @@ }, "suggest": { "ext-pdo": "Required for SQL storage adapters and migrations.", + "ext-pdo_sqlite": "Required for SQLite storage tests and local persistence.", "illuminate/support": "Required for Laravel integration." }, "autoload": { @@ -48,4 +49,4 @@ }, "minimum-stability": "stable", "prefer-stable": true -} \ No newline at end of file +} diff --git a/examples/private-chat/README.md b/examples/private-chat/README.md index c807f09..0208f19 100644 --- a/examples/private-chat/README.md +++ b/examples/private-chat/README.md @@ -112,6 +112,12 @@ PrivateChat displays unread badges for Global Room, direct conversations and pri Badges increase while a conversation is not open and reset when the conversation is opened. +## Storage note + +This example still uses in-memory storage by default. + +The package now includes optional storage adapters and migrations, but the official CLI/config workflow is added in a later phase. + ## Important notes This phase implements direct 1:1 private messaging and private group rooms. diff --git a/src/Database/MigrationRunner.php b/src/Database/MigrationRunner.php new file mode 100644 index 0000000..26727c4 --- /dev/null +++ b/src/Database/MigrationRunner.php @@ -0,0 +1,45 @@ +pdo->exec($this->schemaSql($driver)); + } + + private function schemaSql(string $driver): string + { + $driver = strtolower(trim($driver)); + + if (!in_array($driver, ['sqlite', 'mysql', 'pgsql'], true)) { + throw new StorageException("Unsupported migration driver: {$driver}"); + } + + $path = $this->schemaPath ?? dirname(__DIR__) . '/Database/Schema/' . $driver . '.sql'; + + if (!is_file($path)) { + throw new StorageException("Migration schema file not found: {$path}"); + } + + $sql = file_get_contents($path); + + if (!is_string($sql) || trim($sql) === '') { + throw new StorageException("Migration schema file is empty: {$path}"); + } + + return $sql; + } +} diff --git a/src/Database/Schema/mysql.sql b/src/Database/Schema/mysql.sql new file mode 100644 index 0000000..240a99a --- /dev/null +++ b/src/Database/Schema/mysql.sql @@ -0,0 +1,59 @@ +CREATE TABLE IF NOT EXISTS users ( + id VARCHAR(64) PRIMARY KEY, + display_name VARCHAR(120) NOT NULL, + normalized_display_name VARCHAR(120) NOT NULL, + created_at VARCHAR(40) NOT NULL, + UNIQUE KEY idx_users_normalized_display_name (normalized_display_name) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +CREATE TABLE IF NOT EXISTS sessions ( + id VARCHAR(64) PRIMARY KEY, + user_id VARCHAR(64) NOT NULL, + connected TINYINT(1) NOT NULL DEFAULT 0, + connected_at VARCHAR(40) NOT NULL, + last_seen_at VARCHAR(40) NOT NULL, + INDEX idx_sessions_user_id (user_id), + INDEX idx_sessions_connected (connected), + CONSTRAINT fk_sessions_user_id FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +CREATE TABLE IF NOT EXISTS rooms ( + id VARCHAR(64) PRIMARY KEY, + type VARCHAR(40) NOT NULL, + name VARCHAR(120) NULL, + created_by VARCHAR(64) NOT NULL, + created_at VARCHAR(40) NOT NULL, + INDEX idx_rooms_type (type) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +CREATE TABLE IF NOT EXISTS room_members ( + room_id VARCHAR(64) NOT NULL, + user_id VARCHAR(64) NOT NULL, + joined_at VARCHAR(40) NOT NULL, + PRIMARY KEY (room_id, user_id), + INDEX idx_room_members_user_id (user_id), + CONSTRAINT fk_room_members_room_id FOREIGN KEY (room_id) REFERENCES rooms (id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +CREATE TABLE IF NOT EXISTS messages ( + id VARCHAR(64) PRIMARY KEY, + room_id VARCHAR(64) NOT NULL, + from_user_id VARCHAR(64) NOT NULL, + kind VARCHAR(40) NOT NULL, + body TEXT NULL, + metadata_json JSON NOT NULL, + created_at VARCHAR(40) NOT NULL, + INDEX idx_messages_room_created (room_id, created_at), + CONSTRAINT fk_messages_room_id FOREIGN KEY (room_id) REFERENCES rooms (id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +CREATE TABLE IF NOT EXISTS attachments ( + id VARCHAR(64) PRIMARY KEY, + message_id VARCHAR(64) NOT NULL, + filename VARCHAR(255) NOT NULL, + mime_type VARCHAR(120) NOT NULL, + size_bytes BIGINT NOT NULL, + path TEXT NOT NULL, + created_at VARCHAR(40) NOT NULL, + CONSTRAINT fk_attachments_message_id FOREIGN KEY (message_id) REFERENCES messages (id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; diff --git a/src/Database/Schema/pgsql.sql b/src/Database/Schema/pgsql.sql new file mode 100644 index 0000000..d58ba49 --- /dev/null +++ b/src/Database/Schema/pgsql.sql @@ -0,0 +1,64 @@ +CREATE TABLE IF NOT EXISTS users ( + id VARCHAR(64) PRIMARY KEY, + display_name VARCHAR(120) NOT NULL, + normalized_display_name VARCHAR(120) NOT NULL UNIQUE, + created_at VARCHAR(40) NOT NULL +); + +CREATE TABLE IF NOT EXISTS sessions ( + id VARCHAR(64) PRIMARY KEY, + user_id VARCHAR(64) NOT NULL REFERENCES users (id) ON DELETE CASCADE, + connected BOOLEAN NOT NULL DEFAULT FALSE, + connected_at VARCHAR(40) NOT NULL, + last_seen_at VARCHAR(40) NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_sessions_user_id + ON sessions (user_id); + +CREATE INDEX IF NOT EXISTS idx_sessions_connected + ON sessions (connected); + +CREATE TABLE IF NOT EXISTS rooms ( + id VARCHAR(64) PRIMARY KEY, + type VARCHAR(40) NOT NULL, + name VARCHAR(120) NULL, + created_by VARCHAR(64) NOT NULL, + created_at VARCHAR(40) NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_rooms_type + ON rooms (type); + +CREATE TABLE IF NOT EXISTS room_members ( + room_id VARCHAR(64) NOT NULL REFERENCES rooms (id) ON DELETE CASCADE, + user_id VARCHAR(64) NOT NULL, + joined_at VARCHAR(40) NOT NULL, + PRIMARY KEY (room_id, user_id) +); + +CREATE INDEX IF NOT EXISTS idx_room_members_user_id + ON room_members (user_id); + +CREATE TABLE IF NOT EXISTS messages ( + id VARCHAR(64) PRIMARY KEY, + room_id VARCHAR(64) NOT NULL REFERENCES rooms (id) ON DELETE CASCADE, + from_user_id VARCHAR(64) NOT NULL, + kind VARCHAR(40) NOT NULL, + body TEXT NULL, + metadata_json JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at VARCHAR(40) NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_messages_room_created + ON messages (room_id, created_at); + +CREATE TABLE IF NOT EXISTS attachments ( + id VARCHAR(64) PRIMARY KEY, + message_id VARCHAR(64) NOT NULL REFERENCES messages (id) ON DELETE CASCADE, + filename VARCHAR(255) NOT NULL, + mime_type VARCHAR(120) NOT NULL, + size_bytes BIGINT NOT NULL, + path TEXT NOT NULL, + created_at VARCHAR(40) NOT NULL +); diff --git a/src/Database/Schema/sqlite.sql b/src/Database/Schema/sqlite.sql new file mode 100644 index 0000000..29f0202 --- /dev/null +++ b/src/Database/Schema/sqlite.sql @@ -0,0 +1,71 @@ +CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY, + display_name TEXT NOT NULL, + normalized_display_name TEXT NOT NULL, + created_at TEXT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_users_normalized_display_name + ON users (normalized_display_name); + +CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + connected INTEGER NOT NULL DEFAULT 0, + connected_at TEXT NOT NULL, + last_seen_at TEXT NOT NULL, + FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_sessions_user_id + ON sessions (user_id); + +CREATE INDEX IF NOT EXISTS idx_sessions_connected + ON sessions (connected); + +CREATE TABLE IF NOT EXISTS rooms ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, + name TEXT NULL, + created_by TEXT NOT NULL, + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_rooms_type + ON rooms (type); + +CREATE TABLE IF NOT EXISTS room_members ( + room_id TEXT NOT NULL, + user_id TEXT NOT NULL, + joined_at TEXT NOT NULL, + PRIMARY KEY (room_id, user_id), + FOREIGN KEY (room_id) REFERENCES rooms (id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_room_members_user_id + ON room_members (user_id); + +CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + room_id TEXT NOT NULL, + from_user_id TEXT NOT NULL, + kind TEXT NOT NULL, + body TEXT NULL, + metadata_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY (room_id) REFERENCES rooms (id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_messages_room_created + ON messages (room_id, created_at); + +CREATE TABLE IF NOT EXISTS attachments ( + id TEXT PRIMARY KEY, + message_id TEXT NOT NULL, + filename TEXT NOT NULL, + mime_type TEXT NOT NULL, + size_bytes INTEGER NOT NULL, + path TEXT NOT NULL, + created_at TEXT NOT NULL, + FOREIGN KEY (message_id) REFERENCES messages (id) ON DELETE CASCADE +); diff --git a/src/Exceptions/StorageException.php b/src/Exceptions/StorageException.php new file mode 100644 index 0000000..ab7e5dc --- /dev/null +++ b/src/Exceptions/StorageException.php @@ -0,0 +1,11 @@ +filePath); + + if (!is_dir($directory) && !mkdir($directory, 0775, true) && !is_dir($directory)) { + throw new StorageException("Message storage directory cannot be created: {$directory}"); + } + + if (is_file($this->filePath) && !is_writable($this->filePath)) { + throw new StorageException("Message storage file is not writable: {$this->filePath}"); + } + + if (!is_file($this->filePath) && !is_writable($directory)) { + throw new StorageException("Message storage directory is not writable: {$directory}"); + } + } + + public function save(ChatMessage $message): void + { + $handle = fopen($this->filePath, 'ab'); + + if ($handle === false) { + throw new StorageException("Message storage file cannot be opened: {$this->filePath}"); + } + + try { + if (!flock($handle, LOCK_EX)) { + throw new StorageException("Message storage file cannot be locked: {$this->filePath}"); + } + + $line = json_encode($message->toArray(), JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR) . PHP_EOL; + + if (fwrite($handle, $line) === false) { + throw new StorageException("Message storage file cannot be written: {$this->filePath}"); + } + + flock($handle, LOCK_UN); + } finally { + fclose($handle); + } + } + + public function messagesForRoom(string $roomId, int $limit = 50): array + { + if ($limit <= 0 || !is_file($this->filePath)) { + return []; + } + + $handle = fopen($this->filePath, 'rb'); + + if ($handle === false) { + throw new StorageException("Message storage file cannot be opened: {$this->filePath}"); + } + + $messages = []; + + try { + while (($line = fgets($handle)) !== false) { + $row = json_decode(trim($line), true, 512, JSON_THROW_ON_ERROR); + + if (!is_array($row) || ($row['roomId'] ?? null) !== $roomId) { + continue; + } + + $messages[] = $this->hydrate($row); + } + } finally { + fclose($handle); + } + + return array_slice($messages, -$limit); + } + + /** + * @param array $row + */ + private function hydrate(array $row): ChatMessage + { + $metadata = $row['metadata'] ?? []; + + if (!is_array($metadata)) { + $metadata = []; + } + + /** @var array $metadata */ + return new ChatMessage( + id: (string) $row['id'], + roomId: (string) $row['roomId'], + fromUserId: (string) $row['fromUserId'], + kind: (string) $row['kind'], + body: $row['body'] === null ? null : (string) $row['body'], + createdAt: new DateTimeImmutable((string) $row['createdAt']), + metadata: $metadata, + ); + } +} diff --git a/src/Storage/Pdo/PdoConnectionFactory.php b/src/Storage/Pdo/PdoConnectionFactory.php new file mode 100644 index 0000000..f72ab64 --- /dev/null +++ b/src/Storage/Pdo/PdoConnectionFactory.php @@ -0,0 +1,38 @@ + $options + */ + public static function create( + string $dsn, + ?string $username = null, + ?string $password = null, + array $options = [], + ): PDO { + $defaultOptions = [ + PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, + PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC, + PDO::ATTR_STRINGIFY_FETCHES => false, + ]; + + return new PDO($dsn, $username, $password, $options + $defaultOptions); + } + + public static function sqlite(string $databasePath): PDO + { + return self::create('sqlite:' . $databasePath); + } + + public static function sqliteMemory(): PDO + { + return self::create('sqlite::memory:'); + } +} diff --git a/src/Storage/Pdo/PdoMessageStore.php b/src/Storage/Pdo/PdoMessageStore.php new file mode 100644 index 0000000..bb4a475 --- /dev/null +++ b/src/Storage/Pdo/PdoMessageStore.php @@ -0,0 +1,101 @@ +prepare( + 'INSERT INTO messages (id, room_id, from_user_id, kind, body, metadata_json, created_at) + VALUES (:id, :room_id, :from_user_id, :kind, :body, :metadata_json, :created_at)', + ); + $statement->execute([ + 'id' => $message->id, + 'room_id' => $message->roomId, + 'from_user_id' => $message->fromUserId, + 'kind' => $message->kind, + 'body' => $message->body, + 'metadata_json' => json_encode($message->metadata, JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR), + 'created_at' => $message->createdAt->format(DATE_ATOM), + ]); + } + + public function messagesForRoom(string $roomId, int $limit = 50): array + { + if ($limit <= 0) { + return []; + } + + $statement = $this->prepare( + 'SELECT id, room_id, from_user_id, kind, body, metadata_json, created_at + FROM messages + WHERE room_id = :room_id + ORDER BY created_at DESC + LIMIT :limit', + ); + $statement->bindValue(':room_id', $roomId); + $statement->bindValue(':limit', $limit, PDO::PARAM_INT); + $statement->execute(); + $rows = $statement->fetchAll(); + + $messages = array_map(fn (array $row): ChatMessage => $this->hydrate($row), $rows); + + return array_values(array_reverse($messages)); + } + + /** + * @param array $row + */ + private function hydrate(array $row): ChatMessage + { + return new ChatMessage( + id: (string) $row['id'], + roomId: (string) $row['room_id'], + fromUserId: (string) $row['from_user_id'], + kind: (string) $row['kind'], + body: $row['body'] === null ? null : (string) $row['body'], + createdAt: new DateTimeImmutable((string) $row['created_at']), + metadata: $this->decodeMetadata((string) $row['metadata_json']), + ); + } + + /** + * @return array + */ + private function decodeMetadata(string $json): array + { + $metadata = json_decode($json, true, 512, JSON_THROW_ON_ERROR); + + if (!is_array($metadata)) { + return []; + } + + /** @var array $metadata */ + return $metadata; + } + + private function prepare(string $sql): PDOStatement + { + $statement = $this->pdo->prepare($sql); + + if (!$statement instanceof PDOStatement) { + throw new StorageException('Failed to prepare SQL statement.'); + } + + return $statement; + } +} diff --git a/src/Storage/Pdo/PdoRoomStore.php b/src/Storage/Pdo/PdoRoomStore.php new file mode 100644 index 0000000..d7bb96b --- /dev/null +++ b/src/Storage/Pdo/PdoRoomStore.php @@ -0,0 +1,213 @@ +pdo->beginTransaction(); + + try { + $this->saveRoom($room); + $this->syncMembers($room); + $this->pdo->commit(); + } catch (\Throwable $exception) { + $this->pdo->rollBack(); + + throw $exception; + } + } + + public function find(string $roomId): ?Room + { + $statement = $this->prepare( + 'SELECT id, type, name, created_by, created_at + FROM rooms + WHERE id = :id + LIMIT 1', + ); + $statement->execute(['id' => $roomId]); + $row = $statement->fetch(); + + if (!is_array($row)) { + return null; + } + + return $this->hydrate($row); + } + + public function all(): array + { + $statement = $this->prepare( + 'SELECT id, type, name, created_by, created_at + FROM rooms + ORDER BY created_at ASC', + ); + $statement->execute(); + $rows = $statement->fetchAll(); + + return array_values(array_map(fn (array $row): Room => $this->hydrate($row), $rows)); + } + + public function visibleForUser(string $userId): array + { + $statement = $this->prepare( + 'SELECT r.id, r.type, r.name, r.created_by, r.created_at + FROM rooms r + LEFT JOIN room_members rm ON rm.room_id = r.id AND rm.user_id = :user_id + WHERE r.type = :global_type OR rm.user_id IS NOT NULL + ORDER BY r.created_at ASC', + ); + $statement->execute([ + 'user_id' => $userId, + 'global_type' => Room::TYPE_GLOBAL, + ]); + $rows = $statement->fetchAll(); + + return array_values(array_map(fn (array $row): Room => $this->hydrate($row), $rows)); + } + + public function addMember(string $roomId, string $userId): void + { + if (!$this->roomExists($roomId) || $this->memberExists($roomId, $userId)) { + return; + } + + $statement = $this->prepare( + 'INSERT INTO room_members (room_id, user_id, joined_at) + VALUES (:room_id, :user_id, :joined_at)', + ); + $statement->execute([ + 'room_id' => $roomId, + 'user_id' => $userId, + 'joined_at' => (new DateTimeImmutable())->format(DATE_ATOM), + ]); + } + + public function removeMember(string $roomId, string $userId): void + { + $statement = $this->prepare( + 'DELETE FROM room_members + WHERE room_id = :room_id AND user_id = :user_id', + ); + $statement->execute([ + 'room_id' => $roomId, + 'user_id' => $userId, + ]); + } + + private function saveRoom(Room $room): void + { + if ($this->roomExists($room->id)) { + $statement = $this->prepare( + 'UPDATE rooms + SET type = :type, name = :name, created_by = :created_by, created_at = :created_at + WHERE id = :id', + ); + } else { + $statement = $this->prepare( + 'INSERT INTO rooms (id, type, name, created_by, created_at) + VALUES (:id, :type, :name, :created_by, :created_at)', + ); + } + + $statement->execute([ + 'id' => $room->id, + 'type' => $room->type, + 'name' => $room->name, + 'created_by' => $room->createdBy, + 'created_at' => $room->createdAt->format(DATE_ATOM), + ]); + } + + private function syncMembers(Room $room): void + { + $delete = $this->prepare('DELETE FROM room_members WHERE room_id = :room_id'); + $delete->execute(['room_id' => $room->id]); + + foreach ($room->memberUserIds as $userId) { + $this->addMember($room->id, $userId); + } + } + + private function roomExists(string $roomId): bool + { + $statement = $this->prepare('SELECT 1 FROM rooms WHERE id = :id LIMIT 1'); + $statement->execute(['id' => $roomId]); + + return is_array($statement->fetch()); + } + + private function memberExists(string $roomId, string $userId): bool + { + $statement = $this->prepare( + 'SELECT 1 + FROM room_members + WHERE room_id = :room_id AND user_id = :user_id + LIMIT 1', + ); + $statement->execute([ + 'room_id' => $roomId, + 'user_id' => $userId, + ]); + + return is_array($statement->fetch()); + } + + /** + * @param array $row + */ + private function hydrate(array $row): Room + { + return new Room( + id: (string) $row['id'], + type: (string) $row['type'], + name: $row['name'] === null ? null : (string) $row['name'], + createdBy: (string) $row['created_by'], + memberUserIds: $this->memberUserIds((string) $row['id']), + createdAt: new DateTimeImmutable((string) $row['created_at']), + ); + } + + /** + * @return list + */ + private function memberUserIds(string $roomId): array + { + $statement = $this->prepare( + 'SELECT user_id + FROM room_members + WHERE room_id = :room_id + ORDER BY joined_at ASC', + ); + $statement->execute(['room_id' => $roomId]); + $rows = $statement->fetchAll(); + + return array_values(array_map(static fn (array $row): string => (string) $row['user_id'], $rows)); + } + + private function prepare(string $sql): PDOStatement + { + $statement = $this->pdo->prepare($sql); + + if (!$statement instanceof PDOStatement) { + throw new StorageException('Failed to prepare SQL statement.'); + } + + return $statement; + } +} diff --git a/src/Storage/Pdo/PdoSessionStore.php b/src/Storage/Pdo/PdoSessionStore.php new file mode 100644 index 0000000..26b946c --- /dev/null +++ b/src/Storage/Pdo/PdoSessionStore.php @@ -0,0 +1,204 @@ +saveUser($session); + $this->saveSession($session); + } + + public function findByUserId(string $userId): ?UserSession + { + return $this->findOne( + 'SELECT s.id AS session_id, s.user_id, u.display_name, u.normalized_display_name, s.connected, + s.connected_at, s.last_seen_at + FROM sessions s + INNER JOIN users u ON u.id = s.user_id + WHERE s.user_id = :user_id + ORDER BY s.connected_at DESC + LIMIT 1', + ['user_id' => $userId], + ); + } + + public function findBySessionId(string $sessionId): ?UserSession + { + return $this->findOne( + 'SELECT s.id AS session_id, s.user_id, u.display_name, u.normalized_display_name, s.connected, + s.connected_at, s.last_seen_at + FROM sessions s + INNER JOIN users u ON u.id = s.user_id + WHERE s.id = :session_id + LIMIT 1', + ['session_id' => $sessionId], + ); + } + + public function findConnectedByNormalizedDisplayName(string $normalizedDisplayName): ?UserSession + { + return $this->findOne( + 'SELECT s.id AS session_id, s.user_id, u.display_name, u.normalized_display_name, s.connected, + s.connected_at, s.last_seen_at + FROM sessions s + INNER JOIN users u ON u.id = s.user_id + WHERE u.normalized_display_name = :normalized_display_name + AND s.connected = 1 + ORDER BY s.connected_at DESC + LIMIT 1', + ['normalized_display_name' => $normalizedDisplayName], + ); + } + + public function connected(): array + { + $statement = $this->prepare( + 'SELECT s.id AS session_id, s.user_id, u.display_name, u.normalized_display_name, s.connected, + s.connected_at, s.last_seen_at + FROM sessions s + INNER JOIN users u ON u.id = s.user_id + WHERE s.connected = 1 + ORDER BY u.display_name ASC', + ); + $statement->execute(); + $rows = $statement->fetchAll(); + + return array_values(array_map(fn (array $row): UserSession => $this->hydrate($row), $rows)); + } + + public function disconnect(string $userId): void + { + $statement = $this->prepare( + 'UPDATE sessions + SET connected = 0, last_seen_at = :last_seen_at + WHERE user_id = :user_id', + ); + $statement->execute([ + 'last_seen_at' => (new DateTimeImmutable())->format(DATE_ATOM), + 'user_id' => $userId, + ]); + } + + private function saveUser(UserSession $session): void + { + $exists = $this->exists('SELECT 1 FROM users WHERE id = :id LIMIT 1', ['id' => $session->userId]); + + if ($exists) { + $statement = $this->prepare( + 'UPDATE users + SET display_name = :display_name, normalized_display_name = :normalized_display_name + WHERE id = :id', + ); + } else { + $statement = $this->prepare( + 'INSERT INTO users (id, display_name, normalized_display_name, created_at) + VALUES (:id, :display_name, :normalized_display_name, :created_at)', + ); + $statement->bindValue(':created_at', $session->connectedAt->format(DATE_ATOM)); + } + + $statement->bindValue(':id', $session->userId); + $statement->bindValue(':display_name', $session->displayName); + $statement->bindValue(':normalized_display_name', $session->normalizedDisplayName); + $statement->execute(); + } + + private function saveSession(UserSession $session): void + { + $exists = $this->exists('SELECT 1 FROM sessions WHERE id = :id LIMIT 1', ['id' => $session->sessionId]); + + if ($exists) { + $statement = $this->prepare( + 'UPDATE sessions + SET user_id = :user_id, connected = :connected, connected_at = :connected_at, last_seen_at = :last_seen_at + WHERE id = :id', + ); + } else { + $statement = $this->prepare( + 'INSERT INTO sessions (id, user_id, connected, connected_at, last_seen_at) + VALUES (:id, :user_id, :connected, :connected_at, :last_seen_at)', + ); + } + + $statement->bindValue(':id', $session->sessionId); + $statement->bindValue(':user_id', $session->userId); + $statement->bindValue(':connected', $session->connected ? 1 : 0, PDO::PARAM_INT); + $statement->bindValue(':connected_at', $session->connectedAt->format(DATE_ATOM)); + $statement->bindValue(':last_seen_at', $session->lastSeenAt->format(DATE_ATOM)); + $statement->execute(); + } + + /** + * @param array $parameters + */ + private function findOne(string $sql, array $parameters): ?UserSession + { + $statement = $this->prepare($sql); + $statement->execute($parameters); + $row = $statement->fetch(); + + if (!is_array($row)) { + return null; + } + + return $this->hydrate($row); + } + + /** + * @param array $parameters + */ + private function exists(string $sql, array $parameters): bool + { + $statement = $this->prepare($sql); + $statement->execute($parameters); + + return is_array($statement->fetch()); + } + + /** + * @param array $row + */ + private function hydrate(array $row): UserSession + { + return new UserSession( + sessionId: (string) $row['session_id'], + userId: (string) $row['user_id'], + displayName: (string) $row['display_name'], + normalizedDisplayName: (string) $row['normalized_display_name'], + connected: $this->boolValue($row['connected'] ?? false), + connectedAt: new DateTimeImmutable((string) $row['connected_at']), + lastSeenAt: new DateTimeImmutable((string) $row['last_seen_at']), + ); + } + + private function boolValue(mixed $value): bool + { + return $value === true || $value === 1 || $value === '1'; + } + + private function prepare(string $sql): PDOStatement + { + $statement = $this->pdo->prepare($sql); + + if (!$statement instanceof PDOStatement) { + throw new StorageException('Failed to prepare SQL statement.'); + } + + return $statement; + } +} diff --git a/tests/Integration/Storage/SqliteStoreTest.php b/tests/Integration/Storage/SqliteStoreTest.php new file mode 100644 index 0000000..e1ae263 --- /dev/null +++ b/tests/Integration/Storage/SqliteStoreTest.php @@ -0,0 +1,115 @@ +run('sqlite'); + + $sessions = new PdoSessionStore($pdo); + $rooms = new PdoRoomStore($pdo); + $messages = new PdoMessageStore($pdo); + + $william = UserSession::create('William', 'william'); + $ana = UserSession::create('Ana', 'ana'); + $bruno = UserSession::create('Bruno', 'bruno'); + + $sessions->save($william); + $sessions->save($ana); + $sessions->save($bruno); + + self::assertSame($william->userId, $sessions->findBySessionId($william->sessionId)?->userId); + self::assertSame($ana->sessionId, $sessions->findByUserId($ana->userId)?->sessionId); + self::assertSame($bruno->userId, $sessions->findConnectedByNormalizedDisplayName('bruno')?->userId); + self::assertCount(3, $sessions->connected()); + + $global = new Room('global', Room::TYPE_GLOBAL, 'Global', 'system', [], new DateTimeImmutable('2026-01-01T00:00:00+00:00')); + $direct = new Room( + 'direct_william_ana', + Room::TYPE_DIRECT, + null, + $william->userId, + [$william->userId, $ana->userId], + new DateTimeImmutable('2026-01-01T00:00:01+00:00'), + ); + $group = new Room( + 'room_project', + Room::TYPE_PRIVATE_GROUP, + 'Project', + $william->userId, + [$william->userId, $ana->userId, $bruno->userId], + new DateTimeImmutable('2026-01-01T00:00:02+00:00'), + ); + + $rooms->save($global); + $rooms->save($direct); + $rooms->save($group); + + $messages->save(ChatMessage::text( + roomId: $global->id, + fromUserId: $william->userId, + text: 'Hello global', + metadata: ['clientMessageId' => 'client_global'], + )); + $messages->save(ChatMessage::text( + roomId: $direct->id, + fromUserId: $william->userId, + text: 'Hello Ana', + metadata: ['clientMessageId' => 'client_direct'], + )); + $messages->save(ChatMessage::text( + roomId: $group->id, + fromUserId: $bruno->userId, + text: 'Hello group', + metadata: ['clientMessageId' => 'client_group'], + )); + + $globalMessages = $messages->messagesForRoom($global->id); + $directMessages = $messages->messagesForRoom($direct->id); + $groupMessages = $messages->messagesForRoom($group->id); + + self::assertSame('client_global', $globalMessages[0]->metadata['clientMessageId'] ?? null); + self::assertSame('client_direct', $directMessages[0]->metadata['clientMessageId'] ?? null); + self::assertSame('client_group', $groupMessages[0]->metadata['clientMessageId'] ?? null); + + self::assertSame(['global', 'direct_william_ana', 'room_project'], $this->roomIds($rooms->visibleForUser($william->userId))); + self::assertSame(['global', 'direct_william_ana', 'room_project'], $this->roomIds($rooms->visibleForUser($ana->userId))); + self::assertSame(['global', 'room_project'], $this->roomIds($rooms->visibleForUser($bruno->userId))); + + $sessions->disconnect($ana->userId); + + self::assertFalse($sessions->findByUserId($ana->userId)?->connected); + self::assertCount(2, $sessions->connected()); + } + + /** + * @param list $rooms + * + * @return list + */ + private function roomIds(array $rooms): array + { + return array_map(static fn (Room $room): string => $room->id, $rooms); + } +} diff --git a/tests/Unit/Storage/InMemoryStoreTest.php b/tests/Unit/Storage/InMemoryStoreTest.php new file mode 100644 index 0000000..7d2ee6a --- /dev/null +++ b/tests/Unit/Storage/InMemoryStoreTest.php @@ -0,0 +1,79 @@ +save($william); + $store->save($ana); + + self::assertSame($william, $store->findByUserId($william->userId)); + self::assertSame($ana, $store->findBySessionId($ana->sessionId)); + self::assertSame($william, $store->findConnectedByNormalizedDisplayName('william')); + self::assertSame([$william, $ana], $store->connected()); + + $store->disconnect($william->userId); + + self::assertFalse($william->connected); + self::assertNull($store->findConnectedByNormalizedDisplayName('william')); + self::assertSame([$ana], $store->connected()); + } + + public function testRoomStoreSavesVisibilityAndMembershipChanges(): void + { + $store = new InMemoryRoomStore(); + $global = Room::global(); + $direct = Room::direct('direct_william_ana', ['usr_william', 'usr_ana'], 'usr_william'); + $group = Room::privateGroup('Project', 'usr_william', ['usr_william', 'usr_ana', 'usr_bruno']); + + $store->save($global); + $store->save($direct); + $store->save($group); + + self::assertSame($direct, $store->find('direct_william_ana')); + self::assertSame([$global, $direct, $group], $store->all()); + self::assertSame([$global, $direct, $group], $store->visibleForUser('usr_ana')); + self::assertSame([$global, $group], $store->visibleForUser('usr_bruno')); + + $store->addMember($direct->id, 'usr_bruno'); + + self::assertTrue($store->find($direct->id)?->hasMember('usr_bruno')); + + $store->removeMember($direct->id, 'usr_ana'); + + self::assertFalse($store->find($direct->id)?->hasMember('usr_ana')); + } + + public function testMessageStoreSavesMessagesAndAppliesRoomLimit(): void + { + $store = new InMemoryMessageStore(); + $first = ChatMessage::text('global', 'usr_william', 'First'); + $second = ChatMessage::text('global', 'usr_ana', 'Second'); + $third = ChatMessage::text('direct_william_ana', 'usr_william', 'Private'); + + $store->save($first); + $store->save($second); + $store->save($third); + + self::assertSame([$first, $second], $store->messagesForRoom('global')); + self::assertSame([$second], $store->messagesForRoom('global', 1)); + self::assertSame([], $store->messagesForRoom('global', 0)); + self::assertSame([$third], $store->messagesForRoom('direct_william_ana')); + } +}