diff --git a/composer.json b/composer.json index 22969a2..fcd58b3 100644 --- a/composer.json +++ b/composer.json @@ -22,6 +22,7 @@ ], "require": { "php": ">=8.1", + "ext-ffi": "*", "ext-sockets": "*", "ext-openssl": "*", "psr/http-message": "*", diff --git a/docs/EXCEPTION_HANDLING.md b/docs/EXCEPTION_HANDLING.md new file mode 100644 index 0000000..da5766d --- /dev/null +++ b/docs/EXCEPTION_HANDLING.md @@ -0,0 +1,214 @@ +# Stream Exception Handling Guide + +## Overview + +The Stream module uses a clear exception hierarchy that separates internal control-flow exceptions from application-level exceptions that user code should handle. + +## Exception Hierarchy + +``` +RuntimeException +├── StreamException (base for user-catchable exceptions) +│ └── TransportException (recoverable transport errors) +│ ├── TransportTimeoutException (timeout errors) +│ ├── ConnectionTimeoutException (connection timeouts) +│ └── WriteClosedException (write to closed stream) +└── ConnectionException (internal control-flow - DO NOT CATCH) +``` + +## Key Principles + +### 1. Internal Control-Flow Exceptions (DO NOT CATCH) + +**ConnectionException** is an internal exception used exclusively by the reactor for connection termination. It implements the `AbortConnection` marker interface. + +- **Purpose**: Signal immediate connection termination to the reactor +- **Usage**: Only thrown internally when connection becomes unusable +- **Handling**: Only caught by reactor's exception boundary, never by user code + +```php +// ❌ NEVER do this +try { + $stream->read(1024); +} catch (ConnectionException $e) { + // This breaks the reactor's control flow! +} + +// ✅ Use connection lifecycle events instead +$stream->onClose(function (CloseEvent $event) { + echo "Connection closed: {$event->reason->value}"; +}); +``` + +### 2. Application-Level Exceptions (Safe to Catch) + +**TransportException** and its subclasses represent recoverable errors that application code can handle. + +```php +// ✅ Safe to catch and handle +try { + $stream->write($data); +} catch (WriteClosedException $e) { + // Write side is closed, but connection might still be readable + echo "Cannot write: {$e->getMessage()}"; +} catch (TransportException $e) { + // Other transport-level errors + echo "Transport error: {$e->getMessage()}"; +} +``` + +## Connection Lifecycle Events + +Instead of catching exceptions, use lifecycle events to handle connection state changes: + +### onClose(CloseEvent) +Triggered once when the connection terminates. + +```php +$stream->onClose(function (CloseEvent $event) { + echo "Closed: {$event->reason->value} by {$event->initiator}"; + // Cleanup resources, update connection pools, etc. +}); +``` + +**CloseEvent** provides: +- `reason`: ConnectionAbortReason enum (PEER_CLOSED, RESET, TIMEOUT, etc.) +- `initiator`: 'peer' | 'local' | 'system' +- `message`: Optional descriptive message +- `lastError`: Optional underlying exception +- `timestamp`: When the close occurred + +### onReadableEnd() +Triggered when the read side closes (EOF) but connection may still be writable. + +```php +$stream->onReadableEnd(function () use ($stream) { + echo "Read side closed - no more data from peer"; + // Can still write final response, then close + $stream->write("HTTP/1.1 200 OK\r\n\r\nGoodbye"); + $stream->close(); +}); +``` + +### onWritableEnd() +Triggered when the write side closes but connection may still be readable. + +```php +$stream->onWritableEnd(function () use ($stream) { + echo "Write side closed - cannot send more data"; + // Can still read remaining data from peer +}); +``` + +## Half-Close Support + +Half-close allows one side of a connection to be closed while the other remains open. This is useful for protocols like HTTP where the client sends a complete request, then the server sends a complete response. + +### Configuration + +```php +$stream = new Stream($resource); +$stream->supportsHalfClose = true; // Default: true +``` + +### Behavior + +When `supportsHalfClose = true`: +- `read()` returning EOF triggers `onReadableEnd()` if registered, otherwise throws `ConnectionException` +- `write()` getting EPIPE triggers `onWritableEnd()` if registered, otherwise throws `ConnectionException` + +When `supportsHalfClose = false`: +- EOF or EPIPE immediately throws `ConnectionException` for reactor termination + +## Error Classification + +### Fatal Errors (→ ConnectionException) +These errors indicate the connection is no longer usable: +- Peer closed connection (EOF without half-close support) +- Connection reset by peer (ECONNRESET) +- TLS fatal alerts +- Broken pipe (EPIPE without half-close support) + +### Recoverable Errors (→ TransportException) +These errors can be handled by application logic: +- Connection timeouts (can retry) +- Write to closed stream (can detect and handle) +- Protocol-level errors (application can decide response) +- Temporary resource unavailability + +## Migration from Old API + +### Before +```php +try { + $data = $stream->read(1024); +} catch (ConnectionException $e) { + if ($e->getCode() === ConnectionException::CONNECTION_CLOSED) { + // Handle close + } +} +``` + +### After +```php +// Use events for lifecycle management +$stream->onClose(function (CloseEvent $event) { + if ($event->reason === ConnectionAbortReason::PEER_CLOSED) { + // Handle close + } +}); + +$stream->onReadableEnd(function () { + // Handle EOF/half-close +}); + +// Only catch recoverable exceptions +try { + $data = $stream->read(1024); +} catch (TransportException $e) { + // Handle recoverable errors only +} +``` + +## Best Practices + +1. **Never catch ConnectionException** - use lifecycle events instead +2. **Register onClose for cleanup** - guaranteed to be called once per connection +3. **Use onReadableEnd/onWritableEnd** for half-close protocols +4. **Catch TransportException** for recoverable error handling +5. **Let the reactor handle fatal errors** - it will clean up and emit events + +## Common Patterns + +### HTTP Server +```php +$stream->onReadableEnd(function () use ($stream, $response) { + // Client finished sending request, send response + $stream->write($response); + $stream->close(); +}); + +$stream->onClose(function (CloseEvent $event) use ($connectionPool) { + $connectionPool->remove($stream); +}); +``` + +### Database Client +```php +$stream->onClose(function (CloseEvent $event) use ($pendingQueries) { + // Fail all pending queries + foreach ($pendingQueries as $query) { + $query->fail(new TransportException("Connection lost: {$event->reason->value}")); + } +}); +``` + +### WebSocket +```php +$stream->onClose(function (CloseEvent $event) use ($subscriptions) { + // Clean up subscriptions + foreach ($subscriptions as $sub) { + $sub->cancel(); + } +}); +``` \ No newline at end of file diff --git a/examples/enhanced-stream-usage.php b/examples/enhanced-stream-usage.php new file mode 100644 index 0000000..a205ef0 --- /dev/null +++ b/examples/enhanced-stream-usage.php @@ -0,0 +1,67 @@ +setBlocking(false); + + // Register connection lifecycle events + $stream->onClose(function (CloseEvent $event) { + Output::info("Connection closed: {$event->reason->value} by {$event->initiator}"); + }); + + $stream->onReadableEnd(function () use ($stream) { + Output::info("Read side closed - server finished sending response"); + // We can still write if needed, but in HTTP we typically close now + $stream->close(); + }); + + // Send HTTP request + $request = "GET /get HTTP/1.1\r\nHost: httpbin.org\r\nConnection: close\r\n\r\n"; + $stream->write($request); + + // Read response + $stream->onReadable(function () use ($stream) { + try { + $data = $stream->read(1024); + if ($data !== '') { + echo $data; + } + // Note: When server closes connection, onReadableEnd will be triggered + // instead of an exception, allowing graceful handling + } catch (WriteClosedException $e) { + // This is a recoverable exception - safe to catch + Output::warning("Attempted to write to closed connection: {$e->getMessage()}"); + } catch (TransportException $e) { + // This is a recoverable exception - safe to catch + Output::error("Transport error: {$e->getMessage()}"); + $stream->close(); + } + // Note: We NEVER catch ConnectionException - that's handled internally by the reactor + }); + +} catch (TransportException $e) { + // Connection establishment failed - this is recoverable + Output::error("Failed to connect: {$e->getMessage()}"); + exit(1); +} + +wait(); diff --git a/examples/socket-client.php b/examples/socket-client.php index 0c262fd..efab6a7 100644 --- a/examples/socket-client.php +++ b/examples/socket-client.php @@ -3,7 +3,7 @@ include __DIR__ . '/../vendor/autoload.php'; use Ripple\Socket; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Output; use function Co\wait; @@ -14,7 +14,7 @@ # Enable SSL // $connection->enableSSL(); $connection->setBlocking(false); -} catch (ConnectionException $e) { +} catch (TransportException $e) { Output::warning($e->getMessage()); exit(1); } diff --git a/examples/socket-tunnel.php b/examples/socket-tunnel.php index 5ef60be..411402a 100644 --- a/examples/socket-tunnel.php +++ b/examples/socket-tunnel.php @@ -2,7 +2,7 @@ include __DIR__ . '/../vendor/autoload.php'; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Tunnel\Socks5; use Ripple\Utils\Output; @@ -26,7 +26,7 @@ } echo $data; }); -} catch (ConnectionException $e) { +} catch (TransportException $e) { Output::warning($e->getMessage()); exit(1); } @@ -53,7 +53,7 @@ } echo $data; }); -} catch (ConnectionException $e) { +} catch (TransportException $e) { Output::warning($e->getMessage()); exit(1); } diff --git a/src/Channel/Channel.php b/src/Channel/Channel.php index 6f28487..17dba52 100644 --- a/src/Channel/Channel.php +++ b/src/Channel/Channel.php @@ -17,7 +17,8 @@ use Ripple\File\Lock; use Ripple\Kernel; use Ripple\Stream; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\AbortConnection; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Path; use Ripple\Utils\Serialization\Zx7e; use Throwable; @@ -193,14 +194,25 @@ public function receive(bool $blocking = true): mixed if ($this->readLock->lock(blocking: false)) { try { - if ($this->stream->read(1) === chr(Channel::FRAME_HEADER)) { + $header = $this->stream->read(1); + if ($header === chr(Channel::FRAME_HEADER)) { break; + } elseif ($header === '') { + // EOF - connection closed + $this->readLock->unlock(); + throw new TransportException('Connection closed while reading frame header'); } else { + // Invalid frame header $this->stream->close(); - throw new ConnectionException('Failed to read frame header.'); + throw new TransportException('Invalid frame header received'); } - } catch (ConnectionException) { + } catch (AbortConnection $e) { + // Internal control flow exception - re-throw to let reactor handle $this->readLock->unlock(); + throw $e; + } catch (TransportException $e) { + $this->readLock->unlock(); + throw $e; } } } diff --git a/src/Coroutine/Coroutine.php b/src/Coroutine/Coroutine.php index 230cf8b..4a2f4ae 100644 --- a/src/Coroutine/Coroutine.php +++ b/src/Coroutine/Coroutine.php @@ -101,6 +101,50 @@ private function registerOnFork(): void }); } + /** + * @param Closure $closure + * + * @return Context + */ + public function go(Closure $closure): Context + { + $context = null; + $parentContext = getContext(); + $context = new Context(function () use ($closure, $parentContext, &$context) { + Context::extend($parentContext); + + try { + $this->dispatchEvent(new StartEvent($context)); + $result = $closure(); + $this->dispatchEvent(new CompleteEvent($context, $result)); + + } catch (EscapeException $exception) { + throw $exception; + + } catch (Throwable $exception) { + $this->dispatchEvent(new ErrorEvent($context, $exception)); + + } finally { + $this->dispatchEvent(new FinallyEvent($context)); + + Context::clear(); + $this->tracer->clear($context); + } + }); + + $this->fiber2context[$context->fiber] = WeakReference::create($context); + + try { + $context->start(); + } catch (EscapeException $exception) { + $this->handleEscapeException($exception); + } catch (Throwable $exception) { + // 有预期之外的异常泄漏 + Output::exception($exception); + } + return $context; + } + /** * * The coroutine that cannot be restored can only throw an exception. @@ -168,6 +212,41 @@ public static function throw(Context $context, Throwable $exception): void } } + /** + * @return Context + */ + public function getContext(): Context + { + if (!$fiber = Fiber::getCurrent()) { + return new SuspensionProxy(EventLoop::getSuspension()); + } + return ($this->fiber2context[$fiber] ?? null)?->get() ?? new SuspensionProxy(EventLoop::getSuspension()); + } + + /** + * @param EscapeException $exception + * + * @return void + */ + #[NoReturn] + public function handleEscapeException(EscapeException $exception): void + { + Process::getInstance()->processedInMain($exception->lastWords); + } + + /** + * @param int|float $second + * + * @return int|float + * @throws Throwable + */ + public function sleep(int|float $second): int|float + { + $context = getContext(); + delay(static fn () => Coroutine::resume($context, $second), $second); + return Coroutine::suspend($context); + } + /** * @param Closure $closure * @@ -189,50 +268,6 @@ public function async(Closure $closure): Promise }); } - /** - * @param Closure $closure - * - * @return Context - */ - public function go(Closure $closure): Context - { - $context = null; - $parentContext = getContext(); - $context = new Context(function () use ($closure, $parentContext, &$context) { - Context::extend($parentContext); - - try { - $this->dispatchEvent(new StartEvent($context)); - $result = $closure(); - $this->dispatchEvent(new CompleteEvent($context, $result)); - - } catch (EscapeException $exception) { - throw $exception; - - } catch (Throwable $exception) { - $this->dispatchEvent(new ErrorEvent($context, $exception)); - - } finally { - $this->dispatchEvent(new FinallyEvent($context)); - - Context::clear(); - $this->tracer->clear($context); - } - }); - - $this->fiber2context[$context->fiber] = WeakReference::create($context); - - try { - $context->start(); - } catch (EscapeException $exception) { - $this->handleEscapeException($exception); - } catch (Throwable $exception) { - // 有预期之外的异常泄漏 - Output::exception($exception); - } - return $context; - } - /** * This method is different from onReject, which allows accepting any type of rejected futures object. * When await promise is rejected, an error will be thrown instead of returning the rejected value. @@ -285,41 +320,6 @@ public function await(Promise $promise): mixed return $result; } - /** - * @return Context - */ - public function getContext(): Context - { - if (!$fiber = Fiber::getCurrent()) { - return new SuspensionProxy(EventLoop::getSuspension()); - } - return ($this->fiber2context[$fiber] ?? null)?->get() ?? new SuspensionProxy(EventLoop::getSuspension()); - } - - /** - * @param EscapeException $exception - * - * @return void - */ - #[NoReturn] - public function handleEscapeException(EscapeException $exception): void - { - Process::getInstance()->processedInMain($exception->lastWords); - } - - /** - * @param int|float $second - * - * @return int|float - * @throws Throwable - */ - public function sleep(int|float $second): int|float - { - $context = getContext(); - delay(static fn () => Coroutine::resume($context, $second), $second); - return Coroutine::suspend($context); - } - /** * @param Event $event * diff --git a/src/File/File.php b/src/File/File.php index d4385ed..7acbbb8 100644 --- a/src/File/File.php +++ b/src/File/File.php @@ -12,16 +12,11 @@ namespace Ripple\File; -use Ripple\Coroutine; -use Ripple\File\Exception\FileException; -use Ripple\Kernel; use Ripple\Stream; use Ripple\Support; -use Throwable; use function array_shift; use function Co\forked; -use function Co\getContext; use function fopen; use function file_get_contents; @@ -56,42 +51,14 @@ private function registerOnFork(): void } /** + * @deprecated * @param string $path * * @return string|false - * @throws FileException */ public static function getContents(string $path): string|false { - if (Kernel::getInstance()->getLibEventMethod() === 'epoll') { - return file_get_contents($path); - } - - try { - if (!$resource = fopen($path, 'r')) { - throw (new FileException('Failed to open file: ' . $path)); - } - - $stream = new Stream($resource); - $stream->setBlocking(false); - $content = ''; - $context = getContext(); - $stream->onReadable(static function (Stream $stream) use (&$content, $context) { - $fragment = ''; - while ($buffer = $stream->read(8192)) { - $fragment .= $buffer; - } - - $content .= $fragment; - if ($stream->eof()) { - $stream->close(); - Coroutine::resume($context, $content); - } - }); - return Coroutine::suspend(); - } catch (Throwable $exception) { - throw new FileException($exception->getMessage()); - } + return file_get_contents($path); } /** diff --git a/src/Kernel.php b/src/Kernel.php index 80a5a66..ec059f1 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -59,8 +59,8 @@ class Kernel /*** @var bool */ private bool $mainRunning = true; - /*** @var string */ - private string $libEventMethod; + /*** @var string|null */ + private string|null $libEventMethod = null; /** * diff --git a/src/Proc/Session.php b/src/Proc/Session.php index 78777a2..021d6db 100644 --- a/src/Proc/Session.php +++ b/src/Proc/Session.php @@ -14,7 +14,7 @@ use Closure; use Ripple\Kernel; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Output; use Throwable; @@ -209,7 +209,7 @@ public function write(string $content): bool try { $this->streamStdInput->write($content); return true; - } catch (ConnectionException $exception) { + } catch (TransportException $exception) { Output::error($exception->getMessage()); $this->close(); return false; diff --git a/src/Socket.php b/src/Socket.php index cb1bee7..f7afe20 100644 --- a/src/Socket.php +++ b/src/Socket.php @@ -14,7 +14,10 @@ use Closure; use Ripple\Coroutine\Exception\Exception; +use Ripple\Stream\ConnectionAbortReason; use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; +use Ripple\Stream\Exception\TransportTimeoutException; use RuntimeException; use Throwable; @@ -143,7 +146,7 @@ public static function connect(string $address, int|float $timeout = 0, mixed $c ); if (!$connection) { - throw (new ConnectionException('Failed to connect to the server.', ConnectionException::CONNECTION_ERROR)); + throw new TransportException('Failed to connect to the server.'); } $stream = new static($connection, $address); @@ -153,7 +156,7 @@ public static function connect(string $address, int|float $timeout = 0, mixed $c return $stream; } catch (Throwable $e) { $stream->close(); - throw new ConnectionException($e->getMessage()); + throw new TransportException('Connection establishment failed: ' . $e->getMessage(), 0, $e); } } @@ -177,7 +180,7 @@ public function enableSSL( if ($timeout > 0) { $timeoutEventID = delay(function () use ($reject) { $this->close(); - $reject(new ConnectionException('SSL handshake timeout.', ConnectionException::CONNECTION_TIMEOUT)); + $reject(new TransportTimeoutException('SSL handshake timeout')); }, $timeout); $promise->finally(static fn () => cancel($timeoutEventID)); } @@ -190,7 +193,7 @@ public function enableSSL( if ($handshakeResult === false) { $stream->close(); - $reject(new ConnectionException('Failed to enable crypto.', ConnectionException::CONNECTION_CRYPTO)); + $reject(new TransportException('Failed to enable crypto')); return; } @@ -228,8 +231,8 @@ public function enableSSL( } })->await(); } catch (Throwable $exception) { - // ConnectionException - throw new ConnectionException('Failed to enable SSL.', ConnectionException::CONNECTION_CRYPTO, $exception); + // SSL handshake failure - this is a transport-level error, not internal control flow + throw new TransportException('Failed to enable SSL: ' . $exception->getMessage(), 0, $exception); } } @@ -299,8 +302,11 @@ public function receive(int $length, mixed &$target, int|null $flags = 0): int { $realLength = socket_recv($this->socket, $target, $length, $flags); if ($realLength === false) { - $this->close(); - throw new ConnectionException('Unable to read from stream', ConnectionException::CONNECTION_READ_FAIL); + throw new ConnectionException(ConnectionAbortReason::RESET, 'Unable to read from socket'); + } + if ($realLength === 0) { + // EOF on socket receive + throw new ConnectionException(ConnectionAbortReason::PEER_CLOSED, 'Peer closed connection'); } return $realLength; } @@ -316,8 +322,7 @@ public function write(string $string): int try { return $this->writeInternal($string); } catch (Throwable $exception) { - $this->close(); - throw new ConnectionException($exception->getMessage(), ConnectionException::CONNECTION_WRITE_FAIL); + throw new ConnectionException(ConnectionAbortReason::WRITE_FAILURE, 'Socket write failed: ' . $exception->getMessage(), $exception); } } diff --git a/src/Stream.php b/src/Stream.php index ee58355..217ed8a 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -16,9 +16,13 @@ use Revolt\EventLoop; use Ripple\Coroutine\Context; use Ripple\Coroutine\Coroutine; -use Ripple\Stream\Exception\ConnectionCloseException; +use Ripple\Stream\CloseEvent; +use Ripple\Stream\ConnectionAbortReason; +use Ripple\Stream\Exception\AbortConnection; use Ripple\Stream\Exception\ConnectionException; use Ripple\Stream\Exception\ConnectionTimeoutException; +use Ripple\Stream\Exception\TransportException; +use Ripple\Stream\Exception\WriteClosedException; use Ripple\Stream\Stream as StreamBase; use Ripple\Utils\Format; use Ripple\Utils\Output; @@ -29,32 +33,42 @@ use function Co\async; use function Co\cancel; use function Co\getContext; +use function feof; +use function fread; use function is_resource; use function stream_set_blocking; use function get_resource_id; use function substr; use function strlen; use function end; +use function error_get_last; +use function str_contains; /** - * 2024/09/21 + * Enhanced Stream class with proper connection lifecycle management * - * After production testing and the design architecture of the current framework, the following decisions can meet the needs of the existing design, - * so the following decisions are made: + * This class provides application-layer stream functionality with: + * - Event-driven I/O (onReadable/onWriteable) + * - Connection lifecycle events (onClose/onReadableEnd/onWritableEnd) + * - Half-close support for protocols that need it + * - Proper exception handling with clear separation between: + * - Internal control-flow exceptions (ConnectionException - DO NOT CATCH) + * - Application-level exceptions (TransportException and subclasses - safe to catch) * - * This class only focuses on the reliability of events and does not guarantee data integrity issues caused by write and buffer size. - * It is positioned as a Stream in the application layer. + * Connection Lifecycle Events: + * - onClose(CloseEvent): Triggered once when connection terminates, includes reason and initiator + * - onReadableEnd(): Triggered when read side closes (EOF) but write may continue (half-close) + * - onWritableEnd(): Triggered when write side closes but read may continue * - * Standards that are as safe and easy to use as possible should be followed, allowing some performance to be lost. For more - * fine-grained control, please use the StreamBase class. + * Exception Handling: + * - ConnectionException: Internal reactor control-flow exception - NEVER catch this in user code + * - TransportException: Recoverable transport errors - safe to catch and handle + * - WriteClosedException: Attempt to write to closed write side - safe to catch * - * Provide onReadable/onWriteable methods for monitoring readable and writeable events, and any uncaught ConnectionException - * that occurs in the event will cause the Stream to close - * - * Both the onReadable and onWritable methods will automatically cancel the previous monitoring. - * - * The closed stream will automatically log out all monitored events. If there is a transaction, it will automatically - * mark the transaction as failed. + * Half-Close Support: + * - When supportsHalfClose=true and onReadableEnd/onWritableEnd callbacks are registered, + * EOF conditions trigger the respective end events instead of immediate connection termination + * - This allows protocols like HTTP to continue writing responses after reading the complete request * * @Author cclilshy * @Date 2024/8/16 09:37 @@ -76,11 +90,41 @@ class Stream extends StreamBase */ protected array $onCloseCallbacks = array(); + /** + * @var array + */ + protected array $onReadableEndCallbacks = array(); + + /** + * @var array + */ + protected array $onWritableEndCallbacks = array(); + /** * @var int */ protected int $index = 0; + /** + * @var bool + */ + protected bool $isReadOpen = true; + + /** + * @var bool + */ + protected bool $isWriteOpen = true; + + /** + * @var bool + */ + protected bool $isClosing = false; + + /** + * @var bool + */ + protected bool $supportsHalfClose = true; + /** * @var int $id */ @@ -129,7 +173,7 @@ public function waitForReadable(int|float $timeout = 0): bool $closeOID = $this->onClose(static function () use ($context) { Coroutine::throw( $context, - new ConnectionCloseException('Stream has been closed', null) + new TransportException('Stream has been closed') ); }); @@ -164,6 +208,17 @@ public function onReadable(Closure $closure): string return $this->onReadable = EventLoop::onReadable($this->stream, function () use ($closure) { try { call_user_func_array($closure, [$this, fn () => $this->cancelReadable()]); + } catch (AbortConnection $e) { + // Internal control-flow exception - close connection immediately + $this->cancelReadable(); + $this->cancelWriteable(); + parent::close(); + + // Extract reason from ConnectionException if available + $reason = ($e instanceof ConnectionException) + ? $e->reason + : ConnectionAbortReason::RESET; + $this->emitCloseEvent($reason, 'system', $e); } catch (Throwable $exception) { Output::error($exception->getMessage()); } @@ -202,6 +257,114 @@ public function cancelOnClose(string $key): void unset($this->onCloseCallbacks[$key]); } + /** + * Register callback for readable end event (EOF/half-close) + * + * @param Closure $closure + * @return string + */ + public function onReadableEnd(Closure $closure): string + { + $this->onReadableEndCallbacks[$key = Format::int2string($this->index++)] = $closure; + return $key; + } + + /** + * @param string $key + * @return void + */ + public function cancelOnReadableEnd(string $key): void + { + unset($this->onReadableEndCallbacks[$key]); + } + + /** + * Register callback for writable end event (write side closed) + * + * @param Closure $closure + * @return string + */ + public function onWritableEnd(Closure $closure): string + { + $this->onWritableEndCallbacks[$key = Format::int2string($this->index++)] = $closure; + return $key; + } + + /** + * @param string $key + * @return void + */ + public function cancelOnWritableEnd(string $key): void + { + unset($this->onWritableEndCallbacks[$key]); + } + + /** + * @internal + * Emit readable end event (EOF/half-close) + */ + private function emitReadableEnd(): void + { + if (!$this->isReadOpen) { + return; // Already emitted + } + + $this->isReadOpen = false; + $this->cancelReadable(); + + foreach ($this->onReadableEndCallbacks as $callback) { + try { + call_user_func($callback, $this); + } catch (Throwable $exception) { + Output::error("Error in onReadableEnd callback: " . $exception->getMessage()); + } + } + } + + /** + * @internal + * Emit writable end event (write side closed) + */ + private function emitWritableEnd(): void + { + if (!$this->isWriteOpen) { + return; // Already emitted + } + + $this->isWriteOpen = false; + $this->cancelWriteable(); + + foreach ($this->onWritableEndCallbacks as $callback) { + try { + call_user_func($callback, $this); + } catch (Throwable $exception) { + Output::error("Error in onWritableEnd callback: " . $exception->getMessage()); + } + } + } + + /** + * @internal + * Emit close event with reason + */ + private function emitCloseEvent(ConnectionAbortReason $reason, string $initiator, Throwable|null $lastError = null): void + { + if ($this->isClosing) { + return; // Already closing + } + + $this->isClosing = true; + $closeEvent = new CloseEvent($reason, $initiator, null, $lastError); + + foreach ($this->onCloseCallbacks as $callback) { + try { + call_user_func($callback, $closeEvent); + } catch (Throwable $exception) { + Output::error("Error in onClose callback: " . $exception->getMessage()); + } + } + } + /** * @return void */ @@ -217,13 +380,8 @@ public function close(): void $this->cancelReadable(); $this->cancelWriteable(); - foreach ($this->onCloseCallbacks as $callback) { - try { - call_user_func($callback); - } catch (Throwable $exception) { - Output::error($exception->getMessage()); - } - } + // Emit close event with local initiator + $this->emitCloseEvent(ConnectionAbortReason::LOCAL_CLOSE, 'local'); } /** @@ -269,7 +427,7 @@ public function waitForWriteable(int|float $timeout = 0): bool $closeOID = $this->onClose(static function () use ($context) { Coroutine::throw( $context, - new ConnectionCloseException('Stream has been closed') + new TransportException('Stream has been closed') ); }); @@ -306,12 +464,53 @@ public function onWriteable(Closure $closure): string return $this->onWriteable = EventLoop::onWritable($this->stream, function () use ($closure) { try { call_user_func_array($closure, [$this, fn () => $this->cancelWriteable()]); + } catch (AbortConnection $e) { + // Internal control-flow exception - close connection immediately + $this->cancelReadable(); + $this->cancelWriteable(); + parent::close(); + + // Extract reason from ConnectionException if available + $reason = ($e instanceof ConnectionException) + ? $e->reason + : ConnectionAbortReason::RESET; + $this->emitCloseEvent($reason, 'system', $e); } catch (Throwable $exception) { Output::error($exception->getMessage()); } }); } + /** + * Override parent read method to support half-close detection + * + * @param int $length + * @return string + * @throws ConnectionException + */ + public function read(int $length): string + { + $content = @fread($this->stream, $length); + if ($content === false) { + // Fatal I/O error - throw internal exception for immediate termination + throw new ConnectionException(ConnectionAbortReason::RESET, 'Unable to read from stream'); + } + + if ($content === '' && feof($this->stream)) { + // EOF detected - handle based on half-close support + if ($this->supportsHalfClose && !empty($this->onReadableEndCallbacks)) { + // Half-close supported and user has registered onReadableEnd + $this->emitReadableEnd(); + return $content; // Return empty string, don't throw + } else { + // No half-close support or no onReadableEnd handler - treat as fatal + throw new ConnectionException(ConnectionAbortReason::PEER_CLOSED, 'Peer closed connection'); + } + } + + return $content; + } + /** * @param int $length * @@ -343,15 +542,20 @@ public function readContinuously(int $length): string * @param string $string * * @return int - * @throws ConnectionException + * @throws ConnectionException|WriteClosedException */ public function write(string $string): int { + // Check if write side is already closed + if (!$this->isWriteOpen) { + throw new WriteClosedException('Write side of connection is closed'); + } + $this->buffer .= $string; try { if (!$this->clearBufferIsRunning) { - $writeLength = parent::write($this->buffer); + $writeLength = $this->writeToSocket($this->buffer); $this->buffer = substr($this->buffer, $writeLength); if ($this->buffer === '') { @@ -361,15 +565,52 @@ public function write(string $string): int $this->startClearBuffer(); } - $this->clearBufferWaiters[] = getContext(); - Coroutine::suspend(end($this->clearBufferWaiters)); + // Only suspend if there's still buffered data to write + if ($this->buffer !== '') { + $this->clearBufferWaiters[] = getContext(); + Coroutine::suspend(end($this->clearBufferWaiters)); + } return strlen($string); - } catch (Throwable) { + } catch (AbortConnection $e) { + // Re-throw internal control flow exception + throw $e; + } catch (Throwable $e) { $this->close(); - throw new ConnectionException('Unable to write to stream', ConnectionException::CONNECTION_WRITE_FAIL); + throw new TransportException('Write operation failed: ' . $e->getMessage(), 0, $e); } } + /** + * @internal + * Handle low-level write with proper error classification + */ + private function writeToSocket(string $data): int + { + $result = @parent::write($data); + + if ($result === false) { + $error = error_get_last(); + $errorMsg = $error['message'] ?? 'Unknown write error'; + + // Classify the error + if (str_contains($errorMsg, 'Broken pipe') || str_contains($errorMsg, 'EPIPE')) { + // Peer closed read side + if ($this->supportsHalfClose && !empty($this->onWritableEndCallbacks)) { + $this->emitWritableEnd(); + return 0; // Indicate no bytes written but don't throw + } else { + throw new ConnectionException(ConnectionAbortReason::PEER_READ_CLOSED, 'Peer closed read side'); + } + } elseif (str_contains($errorMsg, 'Connection reset') || str_contains($errorMsg, 'ECONNRESET')) { + throw new ConnectionException(ConnectionAbortReason::RESET, 'Connection reset by peer'); + } else { + throw new ConnectionException(ConnectionAbortReason::WRITE_FAILURE, 'Write failed: ' . $errorMsg); + } + } + + return $result; + } + /** * @return void */ @@ -377,19 +618,29 @@ private function startClearBuffer(): void { $this->onWriteable(function () { try { - $writeLength = parent::write($this->buffer); - } catch (Throwable) { - $this->close(); - $this->failAllWaiters(); - return; - } + $writeLength = $this->writeToSocket($this->buffer); - $this->buffer = substr($this->buffer, $writeLength); + if ($writeLength === 0 && !$this->isWriteOpen) { + // Write side closed during half-close + $this->clearBufferIsRunning = false; + $this->failAllWaiters(); + return; + } - if ($this->buffer === '') { - $this->cancelWriteable(); - $this->clearBufferIsRunning = false; - $this->resumeAllWaiters(); + $this->buffer = substr($this->buffer, $writeLength); + + if ($this->buffer === '') { + $this->cancelWriteable(); + $this->clearBufferIsRunning = false; + $this->resumeAllWaiters(); + } + } catch (AbortConnection $e) { + // Internal control flow exception - will be caught by onWriteable handler + throw $e; + } catch (Throwable $e) { + $this->close(); + $this->failAllWaiters(); + throw new TransportException('Buffer clear failed: ' . $e->getMessage(), 0, $e); } }); @@ -420,7 +671,7 @@ private function failAllWaiters(): void foreach ($waiters as $waiter) { \Ripple\Coroutine::throw( $waiter, - new ConnectionException('Unable to write to stream', ConnectionException::CONNECTION_WRITE_FAIL) + new TransportException('Unable to write to stream - connection closed') ); } } diff --git a/src/Stream/CloseEvent.php b/src/Stream/CloseEvent.php new file mode 100644 index 0000000..47f8c5e --- /dev/null +++ b/src/Stream/CloseEvent.php @@ -0,0 +1,33 @@ +timestamp = $timestamp ?? time(); + } +} diff --git a/src/Stream/ConnectionAbortReason.php b/src/Stream/ConnectionAbortReason.php new file mode 100644 index 0000000..96cd005 --- /dev/null +++ b/src/Stream/ConnectionAbortReason.php @@ -0,0 +1,30 @@ +value, 0, $previous); } } diff --git a/src/Stream/Exception/ConnectionTimeoutException.php b/src/Stream/Exception/ConnectionTimeoutException.php index 804cc28..1b45b7d 100644 --- a/src/Stream/Exception/ConnectionTimeoutException.php +++ b/src/Stream/Exception/ConnectionTimeoutException.php @@ -12,21 +12,18 @@ namespace Ripple\Stream\Exception; -use Ripple\Stream\StreamInterface; use Throwable; -class ConnectionTimeoutException extends ConnectionException +/** + * Timeout exception that can be handled by user code + * This represents a recoverable timeout condition + */ +class ConnectionTimeoutException extends TransportTimeoutException { public function __construct( - string $message = "", - Throwable $previous = null, - StreamInterface $stream = null, + string $message = "Connection timeout", + Throwable|null $previous = null, ) { - parent::__construct( - $message, - ConnectionException::CONNECTION_TIMEOUT, - $previous, - $stream, - ); + parent::__construct($message, 0, $previous); } } diff --git a/src/Stream/Exception/ConnectionCloseException.php b/src/Stream/Exception/StreamException.php similarity index 50% rename from src/Stream/Exception/ConnectionCloseException.php rename to src/Stream/Exception/StreamException.php index efd2258..25bcc01 100644 --- a/src/Stream/Exception/ConnectionCloseException.php +++ b/src/Stream/Exception/StreamException.php @@ -12,18 +12,11 @@ namespace Ripple\Stream\Exception; -use Psr\Http\Message\StreamInterface; -use Throwable; +use RuntimeException; -class ConnectionCloseException extends ConnectionException +/** + * Base exception for all stream-related exceptions that can be handled by user code + */ +class StreamException extends RuntimeException { - public function __construct(string $message = "", Throwable|null $previous = null, StreamInterface|null $stream = null) - { - parent::__construct( - $message, - ConnectionException::CONNECTION_CLOSED, - $previous, - $stream - ); - } } diff --git a/src/Stream/Exception/TransportException.php b/src/Stream/Exception/TransportException.php new file mode 100644 index 0000000..fedc711 --- /dev/null +++ b/src/Stream/Exception/TransportException.php @@ -0,0 +1,20 @@ +stream, $length); if ($content === false) { $this->close(); - throw new ConnectionException('Unable to read from stream', ConnectionException::CONNECTION_READ_FAIL); + throw new ConnectionException(ConnectionAbortReason::RESET, 'Unable to read from stream'); } return $content; } @@ -88,7 +88,7 @@ public function write(string $string): int $result = @fwrite($this->stream, $string); if ($result === false) { $this->close(); - throw new ConnectionException('Unable to write to stream', ConnectionException::CONNECTION_WRITE_FAIL); + throw new ConnectionException(ConnectionAbortReason::WRITE_FAILURE, 'Unable to write to stream'); } return $result; } diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index 3e9a0ee..94e47fb 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -15,7 +15,7 @@ use Closure; use JetBrains\PhpStorm\NoReturn; use Ripple\Socket; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Output; use Ripple\Utils\Serialization\Zx7e; use Throwable; @@ -90,7 +90,7 @@ public function sc2m(Command $command): void { try { $this->parentSocket->write($this->zx7e->encodeFrame($command->__toString())); - } catch (ConnectionException $exception) { + } catch (TransportException $exception) { Output::error($exception->getMessage()); // Writing a message to the parent process fails. There is only one possibility that the parent process has exited. diff --git a/src/Worker/WorkerContext.php b/src/Worker/WorkerContext.php index e991b33..f4be6e9 100644 --- a/src/Worker/WorkerContext.php +++ b/src/Worker/WorkerContext.php @@ -19,12 +19,9 @@ use Ripple\Utils\Serialization\Zx7e; use function Co\async; -use function Co\delay; use function Co\process; use function socket_create_pair; use function socket_export_stream; -use function min; -use function pow; use function is_int; use const AF_INET; diff --git a/src/Worker/WorkerProcess.php b/src/Worker/WorkerProcess.php index 09fb6b4..9374579 100644 --- a/src/Worker/WorkerProcess.php +++ b/src/Worker/WorkerProcess.php @@ -14,7 +14,7 @@ use Ripple\Process\Runtime; use Ripple\Stream; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Output; use Ripple\Utils\Serialization\Zx7e; @@ -71,7 +71,7 @@ public function command(Command $command): bool try { $this->stream->write($this->zx7e->encodeFrame($command->__toString())); return true; - } catch (ConnectionException $e) { + } catch (TransportException $e) { Output::warning($e->getMessage()); return false; } diff --git a/tests/UnixTest.php b/tests/UnixTest.php index a5d43d2..df0767e 100644 --- a/tests/UnixTest.php +++ b/tests/UnixTest.php @@ -5,7 +5,7 @@ use PHPUnit\Framework\Attributes\Test; use PHPUnit\Framework\TestCase; use Ripple\Socket; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Output; use Throwable; @@ -47,7 +47,7 @@ public function test_unix(): void defer(function () use ($path) { try { $this->call($path); - } catch (ConnectionException $exception) { + } catch (TransportException $exception) { Output::error($exception->getMessage()); } });