From c673f981e4cbb05777f7e106111d77adb0321906 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 31 Aug 2025 16:01:25 +0000 Subject: [PATCH 1/7] Refactor stream exception handling with improved lifecycle events Co-authored-by: jingnigg --- docs/EXCEPTION_HANDLING.md | 214 ++++++++++++ examples/enhanced-stream-usage.php | 67 ++++ examples/socket-client.php | 4 +- examples/socket-tunnel.php | 6 +- src/Channel/Channel.php | 20 +- src/Socket.php | 20 +- src/Stream.php | 308 +++++++++++++++--- src/Stream/CloseEvent.php | 29 ++ src/Stream/ConnectionAbortReason.php | 30 ++ src/Stream/Exception/AbortConnection.php | 23 ++ .../Exception/ConnectionCloseException.php | 29 -- src/Stream/Exception/ConnectionException.php | 36 +- .../Exception/ConnectionTimeoutException.php | 19 +- src/Stream/Exception/StreamException.php | 22 ++ src/Stream/Exception/TransportException.php | 20 ++ .../Exception/TransportTimeoutException.php | 20 ++ src/Stream/Exception/WriteClosedException.php | 21 ++ src/Stream/Stream.php | 4 +- 18 files changed, 779 insertions(+), 113 deletions(-) create mode 100644 docs/EXCEPTION_HANDLING.md create mode 100644 examples/enhanced-stream-usage.php create mode 100644 src/Stream/CloseEvent.php create mode 100644 src/Stream/ConnectionAbortReason.php create mode 100644 src/Stream/Exception/AbortConnection.php delete mode 100644 src/Stream/Exception/ConnectionCloseException.php create mode 100644 src/Stream/Exception/StreamException.php create mode 100644 src/Stream/Exception/TransportException.php create mode 100644 src/Stream/Exception/TransportTimeoutException.php create mode 100644 src/Stream/Exception/WriteClosedException.php diff --git a/docs/EXCEPTION_HANDLING.md b/docs/EXCEPTION_HANDLING.md new file mode 100644 index 0000000..da5766d --- /dev/null +++ b/docs/EXCEPTION_HANDLING.md @@ -0,0 +1,214 @@ +# Stream Exception Handling Guide + +## Overview + +The Stream module uses a clear exception hierarchy that separates internal control-flow exceptions from application-level exceptions that user code should handle. + +## Exception Hierarchy + +``` +RuntimeException +├── StreamException (base for user-catchable exceptions) +│ └── TransportException (recoverable transport errors) +│ ├── TransportTimeoutException (timeout errors) +│ ├── ConnectionTimeoutException (connection timeouts) +│ └── WriteClosedException (write to closed stream) +└── ConnectionException (internal control-flow - DO NOT CATCH) +``` + +## Key Principles + +### 1. Internal Control-Flow Exceptions (DO NOT CATCH) + +**ConnectionException** is an internal exception used exclusively by the reactor for connection termination. It implements the `AbortConnection` marker interface. + +- **Purpose**: Signal immediate connection termination to the reactor +- **Usage**: Only thrown internally when connection becomes unusable +- **Handling**: Only caught by reactor's exception boundary, never by user code + +```php +// ❌ NEVER do this +try { + $stream->read(1024); +} catch (ConnectionException $e) { + // This breaks the reactor's control flow! +} + +// ✅ Use connection lifecycle events instead +$stream->onClose(function (CloseEvent $event) { + echo "Connection closed: {$event->reason->value}"; +}); +``` + +### 2. Application-Level Exceptions (Safe to Catch) + +**TransportException** and its subclasses represent recoverable errors that application code can handle. + +```php +// ✅ Safe to catch and handle +try { + $stream->write($data); +} catch (WriteClosedException $e) { + // Write side is closed, but connection might still be readable + echo "Cannot write: {$e->getMessage()}"; +} catch (TransportException $e) { + // Other transport-level errors + echo "Transport error: {$e->getMessage()}"; +} +``` + +## Connection Lifecycle Events + +Instead of catching exceptions, use lifecycle events to handle connection state changes: + +### onClose(CloseEvent) +Triggered once when the connection terminates. + +```php +$stream->onClose(function (CloseEvent $event) { + echo "Closed: {$event->reason->value} by {$event->initiator}"; + // Cleanup resources, update connection pools, etc. +}); +``` + +**CloseEvent** provides: +- `reason`: ConnectionAbortReason enum (PEER_CLOSED, RESET, TIMEOUT, etc.) +- `initiator`: 'peer' | 'local' | 'system' +- `message`: Optional descriptive message +- `lastError`: Optional underlying exception +- `timestamp`: When the close occurred + +### onReadableEnd() +Triggered when the read side closes (EOF) but connection may still be writable. + +```php +$stream->onReadableEnd(function () use ($stream) { + echo "Read side closed - no more data from peer"; + // Can still write final response, then close + $stream->write("HTTP/1.1 200 OK\r\n\r\nGoodbye"); + $stream->close(); +}); +``` + +### onWritableEnd() +Triggered when the write side closes but connection may still be readable. + +```php +$stream->onWritableEnd(function () use ($stream) { + echo "Write side closed - cannot send more data"; + // Can still read remaining data from peer +}); +``` + +## Half-Close Support + +Half-close allows one side of a connection to be closed while the other remains open. This is useful for protocols like HTTP where the client sends a complete request, then the server sends a complete response. + +### Configuration + +```php +$stream = new Stream($resource); +$stream->supportsHalfClose = true; // Default: true +``` + +### Behavior + +When `supportsHalfClose = true`: +- `read()` returning EOF triggers `onReadableEnd()` if registered, otherwise throws `ConnectionException` +- `write()` getting EPIPE triggers `onWritableEnd()` if registered, otherwise throws `ConnectionException` + +When `supportsHalfClose = false`: +- EOF or EPIPE immediately throws `ConnectionException` for reactor termination + +## Error Classification + +### Fatal Errors (→ ConnectionException) +These errors indicate the connection is no longer usable: +- Peer closed connection (EOF without half-close support) +- Connection reset by peer (ECONNRESET) +- TLS fatal alerts +- Broken pipe (EPIPE without half-close support) + +### Recoverable Errors (→ TransportException) +These errors can be handled by application logic: +- Connection timeouts (can retry) +- Write to closed stream (can detect and handle) +- Protocol-level errors (application can decide response) +- Temporary resource unavailability + +## Migration from Old API + +### Before +```php +try { + $data = $stream->read(1024); +} catch (ConnectionException $e) { + if ($e->getCode() === ConnectionException::CONNECTION_CLOSED) { + // Handle close + } +} +``` + +### After +```php +// Use events for lifecycle management +$stream->onClose(function (CloseEvent $event) { + if ($event->reason === ConnectionAbortReason::PEER_CLOSED) { + // Handle close + } +}); + +$stream->onReadableEnd(function () { + // Handle EOF/half-close +}); + +// Only catch recoverable exceptions +try { + $data = $stream->read(1024); +} catch (TransportException $e) { + // Handle recoverable errors only +} +``` + +## Best Practices + +1. **Never catch ConnectionException** - use lifecycle events instead +2. **Register onClose for cleanup** - guaranteed to be called once per connection +3. **Use onReadableEnd/onWritableEnd** for half-close protocols +4. **Catch TransportException** for recoverable error handling +5. **Let the reactor handle fatal errors** - it will clean up and emit events + +## Common Patterns + +### HTTP Server +```php +$stream->onReadableEnd(function () use ($stream, $response) { + // Client finished sending request, send response + $stream->write($response); + $stream->close(); +}); + +$stream->onClose(function (CloseEvent $event) use ($connectionPool) { + $connectionPool->remove($stream); +}); +``` + +### Database Client +```php +$stream->onClose(function (CloseEvent $event) use ($pendingQueries) { + // Fail all pending queries + foreach ($pendingQueries as $query) { + $query->fail(new TransportException("Connection lost: {$event->reason->value}")); + } +}); +``` + +### WebSocket +```php +$stream->onClose(function (CloseEvent $event) use ($subscriptions) { + // Clean up subscriptions + foreach ($subscriptions as $sub) { + $sub->cancel(); + } +}); +``` \ No newline at end of file diff --git a/examples/enhanced-stream-usage.php b/examples/enhanced-stream-usage.php new file mode 100644 index 0000000..f574c57 --- /dev/null +++ b/examples/enhanced-stream-usage.php @@ -0,0 +1,67 @@ +setBlocking(false); + + // Register connection lifecycle events + $stream->onClose(function (CloseEvent $event) { + Output::info("Connection closed: {$event->reason->value} by {$event->initiator}"); + }); + + $stream->onReadableEnd(function () use ($stream) { + Output::info("Read side closed - server finished sending response"); + // We can still write if needed, but in HTTP we typically close now + $stream->close(); + }); + + // Send HTTP request + $request = "GET /get HTTP/1.1\r\nHost: httpbin.org\r\nConnection: close\r\n\r\n"; + $stream->write($request); + + // Read response + $stream->onReadable(function () use ($stream) { + try { + $data = $stream->read(1024); + if ($data !== '') { + echo $data; + } + // Note: When server closes connection, onReadableEnd will be triggered + // instead of an exception, allowing graceful handling + } catch (WriteClosedException $e) { + // This is a recoverable exception - safe to catch + Output::warning("Attempted to write to closed connection: {$e->getMessage()}"); + } catch (TransportException $e) { + // This is a recoverable exception - safe to catch + Output::error("Transport error: {$e->getMessage()}"); + $stream->close(); + } + // Note: We NEVER catch ConnectionException - that's handled internally by the reactor + }); + +} catch (TransportException $e) { + // Connection establishment failed - this is recoverable + Output::error("Failed to connect: {$e->getMessage()}"); + exit(1); +} + +wait(); \ No newline at end of file diff --git a/examples/socket-client.php b/examples/socket-client.php index 0c262fd..efab6a7 100644 --- a/examples/socket-client.php +++ b/examples/socket-client.php @@ -3,7 +3,7 @@ include __DIR__ . '/../vendor/autoload.php'; use Ripple\Socket; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Output; use function Co\wait; @@ -14,7 +14,7 @@ # Enable SSL // $connection->enableSSL(); $connection->setBlocking(false); -} catch (ConnectionException $e) { +} catch (TransportException $e) { Output::warning($e->getMessage()); exit(1); } diff --git a/examples/socket-tunnel.php b/examples/socket-tunnel.php index 5ef60be..411402a 100644 --- a/examples/socket-tunnel.php +++ b/examples/socket-tunnel.php @@ -2,7 +2,7 @@ include __DIR__ . '/../vendor/autoload.php'; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Tunnel\Socks5; use Ripple\Utils\Output; @@ -26,7 +26,7 @@ } echo $data; }); -} catch (ConnectionException $e) { +} catch (TransportException $e) { Output::warning($e->getMessage()); exit(1); } @@ -53,7 +53,7 @@ } echo $data; }); -} catch (ConnectionException $e) { +} catch (TransportException $e) { Output::warning($e->getMessage()); exit(1); } diff --git a/src/Channel/Channel.php b/src/Channel/Channel.php index 6f28487..17dba52 100644 --- a/src/Channel/Channel.php +++ b/src/Channel/Channel.php @@ -17,7 +17,8 @@ use Ripple\File\Lock; use Ripple\Kernel; use Ripple\Stream; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\AbortConnection; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Path; use Ripple\Utils\Serialization\Zx7e; use Throwable; @@ -193,14 +194,25 @@ public function receive(bool $blocking = true): mixed if ($this->readLock->lock(blocking: false)) { try { - if ($this->stream->read(1) === chr(Channel::FRAME_HEADER)) { + $header = $this->stream->read(1); + if ($header === chr(Channel::FRAME_HEADER)) { break; + } elseif ($header === '') { + // EOF - connection closed + $this->readLock->unlock(); + throw new TransportException('Connection closed while reading frame header'); } else { + // Invalid frame header $this->stream->close(); - throw new ConnectionException('Failed to read frame header.'); + throw new TransportException('Invalid frame header received'); } - } catch (ConnectionException) { + } catch (AbortConnection $e) { + // Internal control flow exception - re-throw to let reactor handle $this->readLock->unlock(); + throw $e; + } catch (TransportException $e) { + $this->readLock->unlock(); + throw $e; } } } diff --git a/src/Socket.php b/src/Socket.php index cb1bee7..ea722cb 100644 --- a/src/Socket.php +++ b/src/Socket.php @@ -14,7 +14,9 @@ use Closure; use Ripple\Coroutine\Exception\Exception; +use Ripple\Stream\ConnectionAbortReason; use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use RuntimeException; use Throwable; @@ -143,7 +145,7 @@ public static function connect(string $address, int|float $timeout = 0, mixed $c ); if (!$connection) { - throw (new ConnectionException('Failed to connect to the server.', ConnectionException::CONNECTION_ERROR)); + throw new TransportException('Failed to connect to the server.'); } $stream = new static($connection, $address); @@ -153,7 +155,7 @@ public static function connect(string $address, int|float $timeout = 0, mixed $c return $stream; } catch (Throwable $e) { $stream->close(); - throw new ConnectionException($e->getMessage()); + throw new TransportException('Connection establishment failed: ' . $e->getMessage(), 0, $e); } } @@ -228,8 +230,8 @@ public function enableSSL( } })->await(); } catch (Throwable $exception) { - // ConnectionException - throw new ConnectionException('Failed to enable SSL.', ConnectionException::CONNECTION_CRYPTO, $exception); + // SSL handshake failure - this is a transport-level error, not internal control flow + throw new TransportException('Failed to enable SSL: ' . $exception->getMessage(), 0, $exception); } } @@ -299,8 +301,11 @@ public function receive(int $length, mixed &$target, int|null $flags = 0): int { $realLength = socket_recv($this->socket, $target, $length, $flags); if ($realLength === false) { - $this->close(); - throw new ConnectionException('Unable to read from stream', ConnectionException::CONNECTION_READ_FAIL); + throw new ConnectionException(ConnectionAbortReason::RESET, 'Unable to read from socket'); + } + if ($realLength === 0) { + // EOF on socket receive + throw new ConnectionException(ConnectionAbortReason::PEER_CLOSED, 'Peer closed connection'); } return $realLength; } @@ -316,8 +321,7 @@ public function write(string $string): int try { return $this->writeInternal($string); } catch (Throwable $exception) { - $this->close(); - throw new ConnectionException($exception->getMessage(), ConnectionException::CONNECTION_WRITE_FAIL); + throw new ConnectionException(ConnectionAbortReason::WRITE_FAILURE, 'Socket write failed: ' . $exception->getMessage(), $exception); } } diff --git a/src/Stream.php b/src/Stream.php index ee58355..d6eee97 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -16,9 +16,13 @@ use Revolt\EventLoop; use Ripple\Coroutine\Context; use Ripple\Coroutine\Coroutine; -use Ripple\Stream\Exception\ConnectionCloseException; +use Ripple\Stream\CloseEvent; +use Ripple\Stream\ConnectionAbortReason; +use Ripple\Stream\Exception\AbortConnection; use Ripple\Stream\Exception\ConnectionException; use Ripple\Stream\Exception\ConnectionTimeoutException; +use Ripple\Stream\Exception\TransportException; +use Ripple\Stream\Exception\WriteClosedException; use Ripple\Stream\Stream as StreamBase; use Ripple\Utils\Format; use Ripple\Utils\Output; @@ -37,24 +41,30 @@ use function end; /** - * 2024/09/21 + * Enhanced Stream class with proper connection lifecycle management * - * After production testing and the design architecture of the current framework, the following decisions can meet the needs of the existing design, - * so the following decisions are made: + * This class provides application-layer stream functionality with: + * - Event-driven I/O (onReadable/onWriteable) + * - Connection lifecycle events (onClose/onReadableEnd/onWritableEnd) + * - Half-close support for protocols that need it + * - Proper exception handling with clear separation between: + * - Internal control-flow exceptions (ConnectionException - DO NOT CATCH) + * - Application-level exceptions (TransportException and subclasses - safe to catch) * - * This class only focuses on the reliability of events and does not guarantee data integrity issues caused by write and buffer size. - * It is positioned as a Stream in the application layer. + * Connection Lifecycle Events: + * - onClose(CloseEvent): Triggered once when connection terminates, includes reason and initiator + * - onReadableEnd(): Triggered when read side closes (EOF) but write may continue (half-close) + * - onWritableEnd(): Triggered when write side closes but read may continue * - * Standards that are as safe and easy to use as possible should be followed, allowing some performance to be lost. For more - * fine-grained control, please use the StreamBase class. + * Exception Handling: + * - ConnectionException: Internal reactor control-flow exception - NEVER catch this in user code + * - TransportException: Recoverable transport errors - safe to catch and handle + * - WriteClosedException: Attempt to write to closed write side - safe to catch * - * Provide onReadable/onWriteable methods for monitoring readable and writeable events, and any uncaught ConnectionException - * that occurs in the event will cause the Stream to close - * - * Both the onReadable and onWritable methods will automatically cancel the previous monitoring. - * - * The closed stream will automatically log out all monitored events. If there is a transaction, it will automatically - * mark the transaction as failed. + * Half-Close Support: + * - When supportsHalfClose=true and onReadableEnd/onWritableEnd callbacks are registered, + * EOF conditions trigger the respective end events instead of immediate connection termination + * - This allows protocols like HTTP to continue writing responses after reading the complete request * * @Author cclilshy * @Date 2024/8/16 09:37 @@ -76,11 +86,41 @@ class Stream extends StreamBase */ protected array $onCloseCallbacks = array(); + /** + * @var array + */ + protected array $onReadableEndCallbacks = array(); + + /** + * @var array + */ + protected array $onWritableEndCallbacks = array(); + /** * @var int */ protected int $index = 0; + /** + * @var bool + */ + protected bool $isReadOpen = true; + + /** + * @var bool + */ + protected bool $isWriteOpen = true; + + /** + * @var bool + */ + protected bool $isClosing = false; + + /** + * @var bool + */ + protected bool $supportsHalfClose = true; + /** * @var int $id */ @@ -164,6 +204,12 @@ public function onReadable(Closure $closure): string return $this->onReadable = EventLoop::onReadable($this->stream, function () use ($closure) { try { call_user_func_array($closure, [$this, fn () => $this->cancelReadable()]); + } catch (AbortConnection $e) { + // Internal control-flow exception - close connection immediately + $this->cancelReadable(); + $this->cancelWriteable(); + parent::close(); + $this->emitCloseEvent($e->reason, 'system', $e); } catch (Throwable $exception) { Output::error($exception->getMessage()); } @@ -202,6 +248,114 @@ public function cancelOnClose(string $key): void unset($this->onCloseCallbacks[$key]); } + /** + * Register callback for readable end event (EOF/half-close) + * + * @param Closure $closure + * @return string + */ + public function onReadableEnd(Closure $closure): string + { + $this->onReadableEndCallbacks[$key = Format::int2string($this->index++)] = $closure; + return $key; + } + + /** + * @param string $key + * @return void + */ + public function cancelOnReadableEnd(string $key): void + { + unset($this->onReadableEndCallbacks[$key]); + } + + /** + * Register callback for writable end event (write side closed) + * + * @param Closure $closure + * @return string + */ + public function onWritableEnd(Closure $closure): string + { + $this->onWritableEndCallbacks[$key = Format::int2string($this->index++)] = $closure; + return $key; + } + + /** + * @param string $key + * @return void + */ + public function cancelOnWritableEnd(string $key): void + { + unset($this->onWritableEndCallbacks[$key]); + } + + /** + * @internal + * Emit readable end event (EOF/half-close) + */ + private function emitReadableEnd(): void + { + if (!$this->isReadOpen) { + return; // Already emitted + } + + $this->isReadOpen = false; + $this->cancelReadable(); + + foreach ($this->onReadableEndCallbacks as $callback) { + try { + call_user_func($callback, $this); + } catch (Throwable $exception) { + Output::error("Error in onReadableEnd callback: " . $exception->getMessage()); + } + } + } + + /** + * @internal + * Emit writable end event (write side closed) + */ + private function emitWritableEnd(): void + { + if (!$this->isWriteOpen) { + return; // Already emitted + } + + $this->isWriteOpen = false; + $this->cancelWriteable(); + + foreach ($this->onWritableEndCallbacks as $callback) { + try { + call_user_func($callback, $this); + } catch (Throwable $exception) { + Output::error("Error in onWritableEnd callback: " . $exception->getMessage()); + } + } + } + + /** + * @internal + * Emit close event with reason + */ + private function emitCloseEvent(ConnectionAbortReason $reason, string $initiator, ?Throwable $lastError = null): void + { + if ($this->isClosing) { + return; // Already closing + } + + $this->isClosing = true; + $closeEvent = new CloseEvent($reason, $initiator, null, $lastError); + + foreach ($this->onCloseCallbacks as $callback) { + try { + call_user_func($callback, $closeEvent); + } catch (Throwable $exception) { + Output::error("Error in onClose callback: " . $exception->getMessage()); + } + } + } + /** * @return void */ @@ -217,13 +371,8 @@ public function close(): void $this->cancelReadable(); $this->cancelWriteable(); - foreach ($this->onCloseCallbacks as $callback) { - try { - call_user_func($callback); - } catch (Throwable $exception) { - Output::error($exception->getMessage()); - } - } + // Emit close event with local initiator + $this->emitCloseEvent(ConnectionAbortReason::LOCAL_CLOSE, 'local'); } /** @@ -306,12 +455,48 @@ public function onWriteable(Closure $closure): string return $this->onWriteable = EventLoop::onWritable($this->stream, function () use ($closure) { try { call_user_func_array($closure, [$this, fn () => $this->cancelWriteable()]); + } catch (AbortConnection $e) { + // Internal control-flow exception - close connection immediately + $this->cancelReadable(); + $this->cancelWriteable(); + parent::close(); + $this->emitCloseEvent($e->reason, 'system', $e); } catch (Throwable $exception) { Output::error($exception->getMessage()); } }); } + /** + * Override parent read method to support half-close detection + * + * @param int $length + * @return string + * @throws ConnectionException + */ + public function read(int $length): string + { + $content = @fread($this->stream, $length); + if ($content === false) { + // Fatal I/O error - throw internal exception for immediate termination + throw new ConnectionException(ConnectionAbortReason::RESET, 'Unable to read from stream'); + } + + if ($content === '' && feof($this->stream)) { + // EOF detected - handle based on half-close support + if ($this->supportsHalfClose && !empty($this->onReadableEndCallbacks)) { + // Half-close supported and user has registered onReadableEnd + $this->emitReadableEnd(); + return $content; // Return empty string, don't throw + } else { + // No half-close support or no onReadableEnd handler - treat as fatal + throw new ConnectionException(ConnectionAbortReason::PEER_CLOSED, 'Peer closed connection'); + } + } + + return $content; + } + /** * @param int $length * @@ -343,15 +528,20 @@ public function readContinuously(int $length): string * @param string $string * * @return int - * @throws ConnectionException + * @throws ConnectionException|WriteClosedException */ public function write(string $string): int { + // Check if write side is already closed + if (!$this->isWriteOpen) { + throw new WriteClosedException('Write side of connection is closed'); + } + $this->buffer .= $string; try { if (!$this->clearBufferIsRunning) { - $writeLength = parent::write($this->buffer); + $writeLength = $this->writeToSocket($this->buffer); $this->buffer = substr($this->buffer, $writeLength); if ($this->buffer === '') { @@ -364,10 +554,44 @@ public function write(string $string): int $this->clearBufferWaiters[] = getContext(); Coroutine::suspend(end($this->clearBufferWaiters)); return strlen($string); - } catch (Throwable) { + } catch (AbortConnection $e) { + // Re-throw internal control flow exception + throw $e; + } catch (Throwable $e) { $this->close(); - throw new ConnectionException('Unable to write to stream', ConnectionException::CONNECTION_WRITE_FAIL); + throw new TransportException('Write operation failed: ' . $e->getMessage(), 0, $e); + } + } + + /** + * @internal + * Handle low-level write with proper error classification + */ + private function writeToSocket(string $data): int + { + $result = @parent::write($data); + + if ($result === false) { + $error = error_get_last(); + $errorMsg = $error['message'] ?? 'Unknown write error'; + + // Classify the error + if (str_contains($errorMsg, 'Broken pipe') || str_contains($errorMsg, 'EPIPE')) { + // Peer closed read side + if ($this->supportsHalfClose && !empty($this->onWritableEndCallbacks)) { + $this->emitWritableEnd(); + return 0; // Indicate no bytes written but don't throw + } else { + throw new ConnectionException(ConnectionAbortReason::PEER_READ_CLOSED, 'Peer closed read side'); + } + } elseif (str_contains($errorMsg, 'Connection reset') || str_contains($errorMsg, 'ECONNRESET')) { + throw new ConnectionException(ConnectionAbortReason::RESET, 'Connection reset by peer'); + } else { + throw new ConnectionException(ConnectionAbortReason::WRITE_FAILURE, 'Write failed: ' . $errorMsg); + } } + + return $result; } /** @@ -377,19 +601,29 @@ private function startClearBuffer(): void { $this->onWriteable(function () { try { - $writeLength = parent::write($this->buffer); - } catch (Throwable) { + $writeLength = $this->writeToSocket($this->buffer); + + if ($writeLength === 0 && !$this->isWriteOpen) { + // Write side closed during half-close + $this->clearBufferIsRunning = false; + $this->failAllWaiters(); + return; + } + + $this->buffer = substr($this->buffer, $writeLength); + + if ($this->buffer === '') { + $this->cancelWriteable(); + $this->clearBufferIsRunning = false; + $this->resumeAllWaiters(); + } + } catch (AbortConnection $e) { + // Internal control flow exception - will be caught by onWriteable handler + throw $e; + } catch (Throwable $e) { $this->close(); $this->failAllWaiters(); - return; - } - - $this->buffer = substr($this->buffer, $writeLength); - - if ($this->buffer === '') { - $this->cancelWriteable(); - $this->clearBufferIsRunning = false; - $this->resumeAllWaiters(); + throw new TransportException('Buffer clear failed: ' . $e->getMessage(), 0, $e); } }); diff --git a/src/Stream/CloseEvent.php b/src/Stream/CloseEvent.php new file mode 100644 index 0000000..7cbffb2 --- /dev/null +++ b/src/Stream/CloseEvent.php @@ -0,0 +1,29 @@ +timestamp ??= time(); + } +} \ No newline at end of file diff --git a/src/Stream/ConnectionAbortReason.php b/src/Stream/ConnectionAbortReason.php new file mode 100644 index 0000000..86a340b --- /dev/null +++ b/src/Stream/ConnectionAbortReason.php @@ -0,0 +1,30 @@ +value, 0, $previous); } } diff --git a/src/Stream/Exception/ConnectionTimeoutException.php b/src/Stream/Exception/ConnectionTimeoutException.php index 804cc28..7258c20 100644 --- a/src/Stream/Exception/ConnectionTimeoutException.php +++ b/src/Stream/Exception/ConnectionTimeoutException.php @@ -12,21 +12,18 @@ namespace Ripple\Stream\Exception; -use Ripple\Stream\StreamInterface; use Throwable; -class ConnectionTimeoutException extends ConnectionException +/** + * Timeout exception that can be handled by user code + * This represents a recoverable timeout condition + */ +class ConnectionTimeoutException extends TransportTimeoutException { public function __construct( - string $message = "", - Throwable $previous = null, - StreamInterface $stream = null, + string $message = "Connection timeout", + ?Throwable $previous = null, ) { - parent::__construct( - $message, - ConnectionException::CONNECTION_TIMEOUT, - $previous, - $stream, - ); + parent::__construct($message, 0, $previous); } } diff --git a/src/Stream/Exception/StreamException.php b/src/Stream/Exception/StreamException.php new file mode 100644 index 0000000..0b97e0c --- /dev/null +++ b/src/Stream/Exception/StreamException.php @@ -0,0 +1,22 @@ +stream, $length); if ($content === false) { $this->close(); - throw new ConnectionException('Unable to read from stream', ConnectionException::CONNECTION_READ_FAIL); + throw new ConnectionException(ConnectionAbortReason::RESET, 'Unable to read from stream'); } return $content; } @@ -88,7 +88,7 @@ public function write(string $string): int $result = @fwrite($this->stream, $string); if ($result === false) { $this->close(); - throw new ConnectionException('Unable to write to stream', ConnectionException::CONNECTION_WRITE_FAIL); + throw new ConnectionException(ConnectionAbortReason::WRITE_FAILURE, 'Unable to write to stream'); } return $result; } From 337a73cd13ef745e469dd2c622964097caad6dee Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 31 Aug 2025 16:13:13 +0000 Subject: [PATCH 2/7] Refactor stream exceptions and improve error handling in transport layer Co-authored-by: jingnigg --- src/Proc/Session.php | 4 ++-- src/Socket.php | 5 +++-- src/Stream.php | 29 +++++++++++++++++++------- src/Stream/CloseEvent.php | 4 ++-- src/Worker/Worker.php | 4 ++-- src/Worker/WorkerProcess.php | 4 ++-- test_fix.php | 40 ++++++++++++++++++++++++++++++++++++ tests/UnixTest.php | 4 ++-- 8 files changed, 75 insertions(+), 19 deletions(-) create mode 100644 test_fix.php diff --git a/src/Proc/Session.php b/src/Proc/Session.php index 78777a2..021d6db 100644 --- a/src/Proc/Session.php +++ b/src/Proc/Session.php @@ -14,7 +14,7 @@ use Closure; use Ripple\Kernel; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Output; use Throwable; @@ -209,7 +209,7 @@ public function write(string $content): bool try { $this->streamStdInput->write($content); return true; - } catch (ConnectionException $exception) { + } catch (TransportException $exception) { Output::error($exception->getMessage()); $this->close(); return false; diff --git a/src/Socket.php b/src/Socket.php index ea722cb..f7afe20 100644 --- a/src/Socket.php +++ b/src/Socket.php @@ -17,6 +17,7 @@ use Ripple\Stream\ConnectionAbortReason; use Ripple\Stream\Exception\ConnectionException; use Ripple\Stream\Exception\TransportException; +use Ripple\Stream\Exception\TransportTimeoutException; use RuntimeException; use Throwable; @@ -179,7 +180,7 @@ public function enableSSL( if ($timeout > 0) { $timeoutEventID = delay(function () use ($reject) { $this->close(); - $reject(new ConnectionException('SSL handshake timeout.', ConnectionException::CONNECTION_TIMEOUT)); + $reject(new TransportTimeoutException('SSL handshake timeout')); }, $timeout); $promise->finally(static fn () => cancel($timeoutEventID)); } @@ -192,7 +193,7 @@ public function enableSSL( if ($handshakeResult === false) { $stream->close(); - $reject(new ConnectionException('Failed to enable crypto.', ConnectionException::CONNECTION_CRYPTO)); + $reject(new TransportException('Failed to enable crypto')); return; } diff --git a/src/Stream.php b/src/Stream.php index d6eee97..14cf8ab 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -33,6 +33,8 @@ use function Co\async; use function Co\cancel; use function Co\getContext; +use function feof; +use function fread; use function is_resource; use function stream_set_blocking; use function get_resource_id; @@ -169,7 +171,7 @@ public function waitForReadable(int|float $timeout = 0): bool $closeOID = $this->onClose(static function () use ($context) { Coroutine::throw( $context, - new ConnectionCloseException('Stream has been closed', null) + new TransportException('Stream has been closed') ); }); @@ -209,7 +211,12 @@ public function onReadable(Closure $closure): string $this->cancelReadable(); $this->cancelWriteable(); parent::close(); - $this->emitCloseEvent($e->reason, 'system', $e); + + // Extract reason from ConnectionException if available + $reason = ($e instanceof ConnectionException) + ? $e->reason + : ConnectionAbortReason::RESET; + $this->emitCloseEvent($reason, 'system', $e); } catch (Throwable $exception) { Output::error($exception->getMessage()); } @@ -418,7 +425,7 @@ public function waitForWriteable(int|float $timeout = 0): bool $closeOID = $this->onClose(static function () use ($context) { Coroutine::throw( $context, - new ConnectionCloseException('Stream has been closed') + new TransportException('Stream has been closed') ); }); @@ -460,7 +467,12 @@ public function onWriteable(Closure $closure): string $this->cancelReadable(); $this->cancelWriteable(); parent::close(); - $this->emitCloseEvent($e->reason, 'system', $e); + + // Extract reason from ConnectionException if available + $reason = ($e instanceof ConnectionException) + ? $e->reason + : ConnectionAbortReason::RESET; + $this->emitCloseEvent($reason, 'system', $e); } catch (Throwable $exception) { Output::error($exception->getMessage()); } @@ -551,8 +563,11 @@ public function write(string $string): int $this->startClearBuffer(); } - $this->clearBufferWaiters[] = getContext(); - Coroutine::suspend(end($this->clearBufferWaiters)); + // Only suspend if there's still buffered data to write + if ($this->buffer !== '') { + $this->clearBufferWaiters[] = getContext(); + Coroutine::suspend(end($this->clearBufferWaiters)); + } return strlen($string); } catch (AbortConnection $e) { // Re-throw internal control flow exception @@ -654,7 +669,7 @@ private function failAllWaiters(): void foreach ($waiters as $waiter) { \Ripple\Coroutine::throw( $waiter, - new ConnectionException('Unable to write to stream', ConnectionException::CONNECTION_WRITE_FAIL) + new TransportException('Unable to write to stream - connection closed') ); } } diff --git a/src/Stream/CloseEvent.php b/src/Stream/CloseEvent.php index 7cbffb2..4fbfb8c 100644 --- a/src/Stream/CloseEvent.php +++ b/src/Stream/CloseEvent.php @@ -22,8 +22,8 @@ public function __construct( public string $initiator, // 'peer' | 'local' | 'system' public ?string $message = null, public ?\Throwable $lastError = null, - public ?int $timestamp = null + public int $timestamp = null ) { - $this->timestamp ??= time(); + $this->timestamp = $timestamp ?? time(); } } \ No newline at end of file diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index 3e9a0ee..94e47fb 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -15,7 +15,7 @@ use Closure; use JetBrains\PhpStorm\NoReturn; use Ripple\Socket; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Output; use Ripple\Utils\Serialization\Zx7e; use Throwable; @@ -90,7 +90,7 @@ public function sc2m(Command $command): void { try { $this->parentSocket->write($this->zx7e->encodeFrame($command->__toString())); - } catch (ConnectionException $exception) { + } catch (TransportException $exception) { Output::error($exception->getMessage()); // Writing a message to the parent process fails. There is only one possibility that the parent process has exited. diff --git a/src/Worker/WorkerProcess.php b/src/Worker/WorkerProcess.php index 09fb6b4..9374579 100644 --- a/src/Worker/WorkerProcess.php +++ b/src/Worker/WorkerProcess.php @@ -14,7 +14,7 @@ use Ripple\Process\Runtime; use Ripple\Stream; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Output; use Ripple\Utils\Serialization\Zx7e; @@ -71,7 +71,7 @@ public function command(Command $command): bool try { $this->stream->write($this->zx7e->encodeFrame($command->__toString())); return true; - } catch (ConnectionException $e) { + } catch (TransportException $e) { Output::warning($e->getMessage()); return false; } diff --git a/test_fix.php b/test_fix.php new file mode 100644 index 0000000..592547d --- /dev/null +++ b/test_fix.php @@ -0,0 +1,40 @@ +reason->value}\n"; + echo " Initiator: {$closeEvent->initiator}\n"; + echo " Timestamp: {$closeEvent->timestamp}\n"; +} catch (Throwable $e) { + echo "❌ CloseEvent creation failed: {$e->getMessage()}\n"; + echo " File: {$e->getFile()}:{$e->getLine()}\n"; +} + +echo "\nTesting ConnectionException creation...\n"; + +try { + // Test ConnectionException creation (internal use only) + $exception = new ConnectionException(ConnectionAbortReason::RESET, 'Test reset'); + echo "✅ ConnectionException created successfully\n"; + echo " Reason: {$exception->reason->value}\n"; + echo " Message: {$exception->getMessage()}\n"; +} catch (Throwable $e) { + echo "❌ ConnectionException creation failed: {$e->getMessage()}\n"; +} + +echo "\nAll basic tests completed!\n"; \ No newline at end of file diff --git a/tests/UnixTest.php b/tests/UnixTest.php index a5d43d2..df0767e 100644 --- a/tests/UnixTest.php +++ b/tests/UnixTest.php @@ -5,7 +5,7 @@ use PHPUnit\Framework\Attributes\Test; use PHPUnit\Framework\TestCase; use Ripple\Socket; -use Ripple\Stream\Exception\ConnectionException; +use Ripple\Stream\Exception\TransportException; use Ripple\Utils\Output; use Throwable; @@ -47,7 +47,7 @@ public function test_unix(): void defer(function () use ($path) { try { $this->call($path); - } catch (ConnectionException $exception) { + } catch (TransportException $exception) { Output::error($exception->getMessage()); } }); From 023474868e6afee701d9a64dd763f18010c9fcfb Mon Sep 17 00:00:00 2001 From: cclilshy Date: Mon, 1 Sep 2025 00:30:03 +0800 Subject: [PATCH 3/7] style: format --- examples/enhanced-stream-usage.php | 14 +++---- src/Stream.php | 42 ++++++++++--------- src/Stream/CloseEvent.php | 8 +++- src/Stream/ConnectionAbortReason.php | 2 +- src/Stream/Exception/AbortConnection.php | 4 +- src/Stream/Exception/ConnectionException.php | 8 ++-- src/Stream/Exception/StreamException.php | 2 +- src/Stream/Exception/TransportException.php | 2 +- .../Exception/TransportTimeoutException.php | 2 +- src/Stream/Exception/WriteClosedException.php | 2 +- test_fix.php | 2 +- 11 files changed, 47 insertions(+), 41 deletions(-) diff --git a/examples/enhanced-stream-usage.php b/examples/enhanced-stream-usage.php index f574c57..a205ef0 100644 --- a/examples/enhanced-stream-usage.php +++ b/examples/enhanced-stream-usage.php @@ -12,7 +12,7 @@ /** * Example demonstrating proper usage of the enhanced Stream API - * + * * This example shows: * 1. How to handle connection lifecycle events (onClose, onReadableEnd) * 2. Proper exception handling (catch TransportException, not ConnectionException) @@ -22,22 +22,22 @@ try { $stream = Socket::connect('tcp://httpbin.org:80'); $stream->setBlocking(false); - + // Register connection lifecycle events $stream->onClose(function (CloseEvent $event) { Output::info("Connection closed: {$event->reason->value} by {$event->initiator}"); }); - + $stream->onReadableEnd(function () use ($stream) { Output::info("Read side closed - server finished sending response"); // We can still write if needed, but in HTTP we typically close now $stream->close(); }); - + // Send HTTP request $request = "GET /get HTTP/1.1\r\nHost: httpbin.org\r\nConnection: close\r\n\r\n"; $stream->write($request); - + // Read response $stream->onReadable(function () use ($stream) { try { @@ -57,11 +57,11 @@ } // Note: We NEVER catch ConnectionException - that's handled internally by the reactor }); - + } catch (TransportException $e) { // Connection establishment failed - this is recoverable Output::error("Failed to connect: {$e->getMessage()}"); exit(1); } -wait(); \ No newline at end of file +wait(); diff --git a/src/Stream.php b/src/Stream.php index 14cf8ab..16ba535 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -41,13 +41,15 @@ use function substr; use function strlen; use function end; +use function error_get_last; +use function str_contains; /** * Enhanced Stream class with proper connection lifecycle management * * This class provides application-layer stream functionality with: * - Event-driven I/O (onReadable/onWriteable) - * - Connection lifecycle events (onClose/onReadableEnd/onWritableEnd) + * - Connection lifecycle events (onClose/onReadableEnd/onWritableEnd) * - Half-close support for protocols that need it * - Proper exception handling with clear separation between: * - Internal control-flow exceptions (ConnectionException - DO NOT CATCH) @@ -211,10 +213,10 @@ public function onReadable(Closure $closure): string $this->cancelReadable(); $this->cancelWriteable(); parent::close(); - + // Extract reason from ConnectionException if available - $reason = ($e instanceof ConnectionException) - ? $e->reason + $reason = ($e instanceof ConnectionException) + ? $e->reason : ConnectionAbortReason::RESET; $this->emitCloseEvent($reason, 'system', $e); } catch (Throwable $exception) { @@ -306,10 +308,10 @@ private function emitReadableEnd(): void if (!$this->isReadOpen) { return; // Already emitted } - + $this->isReadOpen = false; $this->cancelReadable(); - + foreach ($this->onReadableEndCallbacks as $callback) { try { call_user_func($callback, $this); @@ -328,10 +330,10 @@ private function emitWritableEnd(): void if (!$this->isWriteOpen) { return; // Already emitted } - + $this->isWriteOpen = false; $this->cancelWriteable(); - + foreach ($this->onWritableEndCallbacks as $callback) { try { call_user_func($callback, $this); @@ -350,10 +352,10 @@ private function emitCloseEvent(ConnectionAbortReason $reason, string $initiator if ($this->isClosing) { return; // Already closing } - + $this->isClosing = true; $closeEvent = new CloseEvent($reason, $initiator, null, $lastError); - + foreach ($this->onCloseCallbacks as $callback) { try { call_user_func($callback, $closeEvent); @@ -467,10 +469,10 @@ public function onWriteable(Closure $closure): string $this->cancelReadable(); $this->cancelWriteable(); parent::close(); - + // Extract reason from ConnectionException if available - $reason = ($e instanceof ConnectionException) - ? $e->reason + $reason = ($e instanceof ConnectionException) + ? $e->reason : ConnectionAbortReason::RESET; $this->emitCloseEvent($reason, 'system', $e); } catch (Throwable $exception) { @@ -493,7 +495,7 @@ public function read(int $length): string // Fatal I/O error - throw internal exception for immediate termination throw new ConnectionException(ConnectionAbortReason::RESET, 'Unable to read from stream'); } - + if ($content === '' && feof($this->stream)) { // EOF detected - handle based on half-close support if ($this->supportsHalfClose && !empty($this->onReadableEndCallbacks)) { @@ -505,7 +507,7 @@ public function read(int $length): string throw new ConnectionException(ConnectionAbortReason::PEER_CLOSED, 'Peer closed connection'); } } - + return $content; } @@ -585,11 +587,11 @@ public function write(string $string): int private function writeToSocket(string $data): int { $result = @parent::write($data); - + if ($result === false) { $error = error_get_last(); $errorMsg = $error['message'] ?? 'Unknown write error'; - + // Classify the error if (str_contains($errorMsg, 'Broken pipe') || str_contains($errorMsg, 'EPIPE')) { // Peer closed read side @@ -605,7 +607,7 @@ private function writeToSocket(string $data): int throw new ConnectionException(ConnectionAbortReason::WRITE_FAILURE, 'Write failed: ' . $errorMsg); } } - + return $result; } @@ -617,14 +619,14 @@ private function startClearBuffer(): void $this->onWriteable(function () { try { $writeLength = $this->writeToSocket($this->buffer); - + if ($writeLength === 0 && !$this->isWriteOpen) { // Write side closed during half-close $this->clearBufferIsRunning = false; $this->failAllWaiters(); return; } - + $this->buffer = substr($this->buffer, $writeLength); if ($this->buffer === '') { diff --git a/src/Stream/CloseEvent.php b/src/Stream/CloseEvent.php index 4fbfb8c..5c35798 100644 --- a/src/Stream/CloseEvent.php +++ b/src/Stream/CloseEvent.php @@ -12,6 +12,10 @@ namespace Ripple\Stream; +use Throwable; + +use function time; + /** * Event object containing information about connection closure */ @@ -21,9 +25,9 @@ public function __construct( public ConnectionAbortReason $reason, public string $initiator, // 'peer' | 'local' | 'system' public ?string $message = null, - public ?\Throwable $lastError = null, + public ?Throwable $lastError = null, public int $timestamp = null ) { $this->timestamp = $timestamp ?? time(); } -} \ No newline at end of file +} diff --git a/src/Stream/ConnectionAbortReason.php b/src/Stream/ConnectionAbortReason.php index 86a340b..96cd005 100644 --- a/src/Stream/ConnectionAbortReason.php +++ b/src/Stream/ConnectionAbortReason.php @@ -27,4 +27,4 @@ enum ConnectionAbortReason: string case PROTOCOL_ERROR = 'protocol_error'; case WRITE_FAILURE = 'write_failure'; case HANDSHAKE_FAILURE = 'handshake_failure'; -} \ No newline at end of file +} diff --git a/src/Stream/Exception/AbortConnection.php b/src/Stream/Exception/AbortConnection.php index 6e09219..49079f9 100644 --- a/src/Stream/Exception/AbortConnection.php +++ b/src/Stream/Exception/AbortConnection.php @@ -14,10 +14,10 @@ /** * @internal - * + * * Marker interface for internal exceptions that should trigger immediate connection termination. * Only the reactor's exception boundary should catch exceptions implementing this interface. */ interface AbortConnection { -} \ No newline at end of file +} diff --git a/src/Stream/Exception/ConnectionException.php b/src/Stream/Exception/ConnectionException.php index 26d49f8..d63efbb 100644 --- a/src/Stream/Exception/ConnectionException.php +++ b/src/Stream/Exception/ConnectionException.php @@ -18,17 +18,17 @@ /** * @internal - * + * * Internal control-flow exception used exclusively by the reactor to terminate connections. * This exception should NEVER be caught by user code or extended by application-level exceptions. - * + * * When this exception is thrown, it signals that the connection must be immediately closed * and all related event monitoring must be cancelled. The reactor's exception boundary * will catch this exception, perform cleanup, and emit onClose events. - * + * * User code should use onClose, onReadableEnd, and onWritableEnd events to handle * connection lifecycle events instead of catching this exception. - * + * * @Author cclilshy * @Date 2024/8/16 09:37 */ diff --git a/src/Stream/Exception/StreamException.php b/src/Stream/Exception/StreamException.php index 0b97e0c..25bcc01 100644 --- a/src/Stream/Exception/StreamException.php +++ b/src/Stream/Exception/StreamException.php @@ -19,4 +19,4 @@ */ class StreamException extends RuntimeException { -} \ No newline at end of file +} diff --git a/src/Stream/Exception/TransportException.php b/src/Stream/Exception/TransportException.php index 1af6428..fedc711 100644 --- a/src/Stream/Exception/TransportException.php +++ b/src/Stream/Exception/TransportException.php @@ -17,4 +17,4 @@ */ class TransportException extends StreamException { -} \ No newline at end of file +} diff --git a/src/Stream/Exception/TransportTimeoutException.php b/src/Stream/Exception/TransportTimeoutException.php index 8dd7b2f..8d0e2d9 100644 --- a/src/Stream/Exception/TransportTimeoutException.php +++ b/src/Stream/Exception/TransportTimeoutException.php @@ -17,4 +17,4 @@ */ class TransportTimeoutException extends TransportException { -} \ No newline at end of file +} diff --git a/src/Stream/Exception/WriteClosedException.php b/src/Stream/Exception/WriteClosedException.php index 0018cb9..1daa060 100644 --- a/src/Stream/Exception/WriteClosedException.php +++ b/src/Stream/Exception/WriteClosedException.php @@ -18,4 +18,4 @@ */ class WriteClosedException extends TransportException { -} \ No newline at end of file +} diff --git a/test_fix.php b/test_fix.php index 592547d..8d2c049 100644 --- a/test_fix.php +++ b/test_fix.php @@ -37,4 +37,4 @@ echo "❌ ConnectionException creation failed: {$e->getMessage()}\n"; } -echo "\nAll basic tests completed!\n"; \ No newline at end of file +echo "\nAll basic tests completed!\n"; From c22d538dafa9d10707f8e84ebfa48912f74934c3 Mon Sep 17 00:00:00 2001 From: cclilshy Date: Wed, 10 Sep 2025 17:27:11 +0800 Subject: [PATCH 4/7] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E5=AF=B9readonly?= =?UTF-8?q?=E5=B1=9E=E6=80=A7=E5=86=99=E5=AF=BC=E8=87=B4=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/File/File.php | 11 +++++++---- src/Kernel.php | 4 ++-- src/Stream/CloseEvent.php | 4 ++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/File/File.php b/src/File/File.php index d4385ed..e472a8f 100644 --- a/src/File/File.php +++ b/src/File/File.php @@ -22,6 +22,7 @@ use function array_shift; use function Co\forked; use function Co\getContext; +use function Co\thread; use function fopen; use function file_get_contents; @@ -63,11 +64,13 @@ private function registerOnFork(): void */ public static function getContents(string $path): string|false { - if (Kernel::getInstance()->getLibEventMethod() === 'epoll') { - return file_get_contents($path); - } - try { + $kernel = Kernel::getInstance(); + $libEventMethod = $kernel->getLibEventMethod(); + if (!$libEventMethod || $libEventMethod === 'epoll') { + return file_get_contents($path); + } + if (!$resource = fopen($path, 'r')) { throw (new FileException('Failed to open file: ' . $path)); } diff --git a/src/Kernel.php b/src/Kernel.php index 80a5a66..ec059f1 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -59,8 +59,8 @@ class Kernel /*** @var bool */ private bool $mainRunning = true; - /*** @var string */ - private string $libEventMethod; + /*** @var string|null */ + private string|null $libEventMethod = null; /** * diff --git a/src/Stream/CloseEvent.php b/src/Stream/CloseEvent.php index 5c35798..8db7293 100644 --- a/src/Stream/CloseEvent.php +++ b/src/Stream/CloseEvent.php @@ -19,14 +19,14 @@ /** * Event object containing information about connection closure */ -readonly class CloseEvent +class CloseEvent { public function __construct( public ConnectionAbortReason $reason, public string $initiator, // 'peer' | 'local' | 'system' public ?string $message = null, public ?Throwable $lastError = null, - public int $timestamp = null + public ?int $timestamp = null ) { $this->timestamp = $timestamp ?? time(); } From 3d3c02d62a9b8ddcd4baba3be856b12a034c4b5a Mon Sep 17 00:00:00 2001 From: cclilshy Date: Wed, 10 Sep 2025 18:17:25 +0800 Subject: [PATCH 5/7] =?UTF-8?q?fix:=20=E5=8F=96=E6=B6=88=E5=AF=B9=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E7=B1=BB=E5=9E=8B=E7=9A=84stream=E8=BF=9B=E8=A1=8Cepo?= =?UTF-8?q?ll=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- composer.json | 1 + src/Coroutine/Coroutine.php | 168 +++++++++++++++++------------------ src/File/File.php | 40 +-------- src/Worker/WorkerContext.php | 3 - 4 files changed, 83 insertions(+), 129 deletions(-) diff --git a/composer.json b/composer.json index 22969a2..fcd58b3 100644 --- a/composer.json +++ b/composer.json @@ -22,6 +22,7 @@ ], "require": { "php": ">=8.1", + "ext-ffi": "*", "ext-sockets": "*", "ext-openssl": "*", "psr/http-message": "*", diff --git a/src/Coroutine/Coroutine.php b/src/Coroutine/Coroutine.php index 230cf8b..6803915 100644 --- a/src/Coroutine/Coroutine.php +++ b/src/Coroutine/Coroutine.php @@ -101,6 +101,50 @@ private function registerOnFork(): void }); } + /** + * @param Closure $closure + * + * @return Context + */ + public function go(Closure $closure): Context + { + $context = null; + $parentContext = getContext(); + $context = new Context(function () use ($closure, $parentContext, &$context) { + Context::extend($parentContext); + + try { + $this->dispatchEvent(new StartEvent($context)); + $result = $closure(); + $this->dispatchEvent(new CompleteEvent($context, $result)); + + } catch (EscapeException $exception) { + throw $exception; + + } catch (Throwable $exception) { + $this->dispatchEvent(new ErrorEvent($context, $exception)); + + } finally { + $this->dispatchEvent(new FinallyEvent($context)); + + Context::clear(); + $this->tracer->clear($context); + } + }); + + $this->fiber2context[$context->fiber] = WeakReference::create($context); + + try { + $context->start(); + } catch (EscapeException $exception) { + $this->handleEscapeException($exception); + } catch (Throwable $exception) { + // 有预期之外的异常泄漏 + Output::exception($exception); + } + return $context; + } + /** * * The coroutine that cannot be restored can only throw an exception. @@ -168,6 +212,41 @@ public static function throw(Context $context, Throwable $exception): void } } + /** + * @return Context + */ + public function getContext(): Context + { + if (!$fiber = Fiber::getCurrent()) { + return new SuspensionProxy(EventLoop::getSuspension()); + } + return ($this->fiber2context[$fiber] ?? null)?->get() ?? new SuspensionProxy(EventLoop::getSuspension()); + } + + /** + * @param EscapeException $exception + * + * @return void + */ + #[NoReturn] + public function handleEscapeException(EscapeException $exception): void + { + Process::getInstance()->processedInMain($exception->lastWords); + } + + /** + * @param int|float $second + * + * @return int|float + * @throws Throwable + */ + public function sleep(int|float $second): int|float + { + $context = getContext(); + delay(static fn () => Coroutine::resume($context, $second), $second); + return Coroutine::suspend($context); + } + /** * @param Closure $closure * @@ -189,50 +268,6 @@ public function async(Closure $closure): Promise }); } - /** - * @param Closure $closure - * - * @return Context - */ - public function go(Closure $closure): Context - { - $context = null; - $parentContext = getContext(); - $context = new Context(function () use ($closure, $parentContext, &$context) { - Context::extend($parentContext); - - try { - $this->dispatchEvent(new StartEvent($context)); - $result = $closure(); - $this->dispatchEvent(new CompleteEvent($context, $result)); - - } catch (EscapeException $exception) { - throw $exception; - - } catch (Throwable $exception) { - $this->dispatchEvent(new ErrorEvent($context, $exception)); - - } finally { - $this->dispatchEvent(new FinallyEvent($context)); - - Context::clear(); - $this->tracer->clear($context); - } - }); - - $this->fiber2context[$context->fiber] = WeakReference::create($context); - - try { - $context->start(); - } catch (EscapeException $exception) { - $this->handleEscapeException($exception); - } catch (Throwable $exception) { - // 有预期之外的异常泄漏 - Output::exception($exception); - } - return $context; - } - /** * This method is different from onReject, which allows accepting any type of rejected futures object. * When await promise is rejected, an error will be thrown instead of returning the rejected value. @@ -248,11 +283,7 @@ public function go(Closure $closure): Context public function await(Promise $promise): mixed { if ($promise->getStatus() === Promise::FULFILLED) { - $result = $promise->getResult(); - if ($result instanceof Promise) { - return $this->await($result); - } - return $result; + return $promise->getResult(); } if ($promise->getStatus() === Promise::REJECTED) { @@ -278,45 +309,6 @@ public function await(Promise $promise): mixed }); // Confirm that you have prepared to handle Fiber recovery and take over control of Fiber by suspending it - $result = Coroutine::suspend($context); - if ($result instanceof Promise) { - return $this->await($result); - } - return $result; - } - - /** - * @return Context - */ - public function getContext(): Context - { - if (!$fiber = Fiber::getCurrent()) { - return new SuspensionProxy(EventLoop::getSuspension()); - } - return ($this->fiber2context[$fiber] ?? null)?->get() ?? new SuspensionProxy(EventLoop::getSuspension()); - } - - /** - * @param EscapeException $exception - * - * @return void - */ - #[NoReturn] - public function handleEscapeException(EscapeException $exception): void - { - Process::getInstance()->processedInMain($exception->lastWords); - } - - /** - * @param int|float $second - * - * @return int|float - * @throws Throwable - */ - public function sleep(int|float $second): int|float - { - $context = getContext(); - delay(static fn () => Coroutine::resume($context, $second), $second); return Coroutine::suspend($context); } diff --git a/src/File/File.php b/src/File/File.php index e472a8f..7acbbb8 100644 --- a/src/File/File.php +++ b/src/File/File.php @@ -12,17 +12,11 @@ namespace Ripple\File; -use Ripple\Coroutine; -use Ripple\File\Exception\FileException; -use Ripple\Kernel; use Ripple\Stream; use Ripple\Support; -use Throwable; use function array_shift; use function Co\forked; -use function Co\getContext; -use function Co\thread; use function fopen; use function file_get_contents; @@ -57,44 +51,14 @@ private function registerOnFork(): void } /** + * @deprecated * @param string $path * * @return string|false - * @throws FileException */ public static function getContents(string $path): string|false { - try { - $kernel = Kernel::getInstance(); - $libEventMethod = $kernel->getLibEventMethod(); - if (!$libEventMethod || $libEventMethod === 'epoll') { - return file_get_contents($path); - } - - if (!$resource = fopen($path, 'r')) { - throw (new FileException('Failed to open file: ' . $path)); - } - - $stream = new Stream($resource); - $stream->setBlocking(false); - $content = ''; - $context = getContext(); - $stream->onReadable(static function (Stream $stream) use (&$content, $context) { - $fragment = ''; - while ($buffer = $stream->read(8192)) { - $fragment .= $buffer; - } - - $content .= $fragment; - if ($stream->eof()) { - $stream->close(); - Coroutine::resume($context, $content); - } - }); - return Coroutine::suspend(); - } catch (Throwable $exception) { - throw new FileException($exception->getMessage()); - } + return file_get_contents($path); } /** diff --git a/src/Worker/WorkerContext.php b/src/Worker/WorkerContext.php index e991b33..f4be6e9 100644 --- a/src/Worker/WorkerContext.php +++ b/src/Worker/WorkerContext.php @@ -19,12 +19,9 @@ use Ripple\Utils\Serialization\Zx7e; use function Co\async; -use function Co\delay; use function Co\process; use function socket_create_pair; use function socket_export_stream; -use function min; -use function pow; use function is_int; use const AF_INET; From 6d2880e0780ab4bf6d2b4bcac18c631e6e7af9fa Mon Sep 17 00:00:00 2001 From: cclilshy Date: Wed, 10 Sep 2025 19:35:51 +0800 Subject: [PATCH 6/7] =?UTF-8?q?fix:=20=E6=81=A2=E5=A4=8DPromise.await?= =?UTF-8?q?=E7=9A=84=E9=BB=98=E8=AE=A4=E5=B1=95=E5=B9=B3=E8=A1=8C=E4=B8=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 1.js | 18 +++++++++++++++++ src/Coroutine/Coroutine.php | 12 +++++++++-- test_fix.php | 40 ------------------------------------- 3 files changed, 28 insertions(+), 42 deletions(-) create mode 100644 1.js delete mode 100644 test_fix.php diff --git a/1.js b/1.js new file mode 100644 index 0000000..4e22ecc --- /dev/null +++ b/1.js @@ -0,0 +1,18 @@ +async function func_a(){ + return 'hello world'; +} + +async function func_b(){ + return func_a(); +} + +async function func_c(){ + console.log(func_b()); + return func_b(); +} + +async function main(){ + console.log(await func_c()) +} + +main(); \ No newline at end of file diff --git a/src/Coroutine/Coroutine.php b/src/Coroutine/Coroutine.php index 6803915..4a2f4ae 100644 --- a/src/Coroutine/Coroutine.php +++ b/src/Coroutine/Coroutine.php @@ -283,7 +283,11 @@ public function async(Closure $closure): Promise public function await(Promise $promise): mixed { if ($promise->getStatus() === Promise::FULFILLED) { - return $promise->getResult(); + $result = $promise->getResult(); + if ($result instanceof Promise) { + return $this->await($result); + } + return $result; } if ($promise->getStatus() === Promise::REJECTED) { @@ -309,7 +313,11 @@ public function await(Promise $promise): mixed }); // Confirm that you have prepared to handle Fiber recovery and take over control of Fiber by suspending it - return Coroutine::suspend($context); + $result = Coroutine::suspend($context); + if ($result instanceof Promise) { + return $this->await($result); + } + return $result; } /** diff --git a/test_fix.php b/test_fix.php deleted file mode 100644 index 8d2c049..0000000 --- a/test_fix.php +++ /dev/null @@ -1,40 +0,0 @@ -reason->value}\n"; - echo " Initiator: {$closeEvent->initiator}\n"; - echo " Timestamp: {$closeEvent->timestamp}\n"; -} catch (Throwable $e) { - echo "❌ CloseEvent creation failed: {$e->getMessage()}\n"; - echo " File: {$e->getFile()}:{$e->getLine()}\n"; -} - -echo "\nTesting ConnectionException creation...\n"; - -try { - // Test ConnectionException creation (internal use only) - $exception = new ConnectionException(ConnectionAbortReason::RESET, 'Test reset'); - echo "✅ ConnectionException created successfully\n"; - echo " Reason: {$exception->reason->value}\n"; - echo " Message: {$exception->getMessage()}\n"; -} catch (Throwable $e) { - echo "❌ ConnectionException creation failed: {$e->getMessage()}\n"; -} - -echo "\nAll basic tests completed!\n"; From a213c4fd04eab26be1dba9197cbbe882280e7c0d Mon Sep 17 00:00:00 2001 From: cclilshy Date: Wed, 10 Sep 2025 20:01:40 +0800 Subject: [PATCH 7/7] =?UTF-8?q?doc:=20=E7=BB=9F=E4=B8=80=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E9=A3=8E=E6=A0=BC=E5=B9=B6=E8=A1=A5=E5=85=85=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 1.js | 18 ------- src/Stream.php | 2 +- src/Stream/CloseEvent.php | 6 +-- src/Stream/Exception/ConnectionException.php | 2 +- .../Exception/ConnectionTimeoutException.php | 2 +- src/Stream/PR-47.md | 51 +++++++++++++++++++ 6 files changed, 57 insertions(+), 24 deletions(-) delete mode 100644 1.js create mode 100644 src/Stream/PR-47.md diff --git a/1.js b/1.js deleted file mode 100644 index 4e22ecc..0000000 --- a/1.js +++ /dev/null @@ -1,18 +0,0 @@ -async function func_a(){ - return 'hello world'; -} - -async function func_b(){ - return func_a(); -} - -async function func_c(){ - console.log(func_b()); - return func_b(); -} - -async function main(){ - console.log(await func_c()) -} - -main(); \ No newline at end of file diff --git a/src/Stream.php b/src/Stream.php index 16ba535..217ed8a 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -347,7 +347,7 @@ private function emitWritableEnd(): void * @internal * Emit close event with reason */ - private function emitCloseEvent(ConnectionAbortReason $reason, string $initiator, ?Throwable $lastError = null): void + private function emitCloseEvent(ConnectionAbortReason $reason, string $initiator, Throwable|null $lastError = null): void { if ($this->isClosing) { return; // Already closing diff --git a/src/Stream/CloseEvent.php b/src/Stream/CloseEvent.php index 8db7293..47f8c5e 100644 --- a/src/Stream/CloseEvent.php +++ b/src/Stream/CloseEvent.php @@ -24,9 +24,9 @@ class CloseEvent public function __construct( public ConnectionAbortReason $reason, public string $initiator, // 'peer' | 'local' | 'system' - public ?string $message = null, - public ?Throwable $lastError = null, - public ?int $timestamp = null + public string|null $message = null, + public Throwable|null $lastError = null, + public int|null $timestamp = null ) { $this->timestamp = $timestamp ?? time(); } diff --git a/src/Stream/Exception/ConnectionException.php b/src/Stream/Exception/ConnectionException.php index d63efbb..61eb33b 100644 --- a/src/Stream/Exception/ConnectionException.php +++ b/src/Stream/Exception/ConnectionException.php @@ -37,7 +37,7 @@ final class ConnectionException extends RuntimeException implements AbortConnect public function __construct( public readonly ConnectionAbortReason $reason, string $message = '', - ?Throwable $previous = null + Throwable|null $previous = null ) { parent::__construct($message ?: $reason->value, 0, $previous); } diff --git a/src/Stream/Exception/ConnectionTimeoutException.php b/src/Stream/Exception/ConnectionTimeoutException.php index 7258c20..1b45b7d 100644 --- a/src/Stream/Exception/ConnectionTimeoutException.php +++ b/src/Stream/Exception/ConnectionTimeoutException.php @@ -22,7 +22,7 @@ class ConnectionTimeoutException extends TransportTimeoutException { public function __construct( string $message = "Connection timeout", - ?Throwable $previous = null, + Throwable|null $previous = null, ) { parent::__construct($message, 0, $previous); } diff --git a/src/Stream/PR-47.md b/src/Stream/PR-47.md new file mode 100644 index 0000000..94699ae --- /dev/null +++ b/src/Stream/PR-47.md @@ -0,0 +1,51 @@ +我仔细审阅了PR的描述和所有相关评论,包括机器人对每次修改的总结,以全面理解本次PR的核心动机和技术决策。基于这些信息,我总结了PR的发起原因、读写全工/半全工的取舍方案,以及关于禁止捕获关键异常类型的最终决策,并提供了相应的规范文档。 + +--- + +### 本次PR的发起原因 + +本次PR旨在解决 `ConnectionException` 在内部控制流和应用层错误之间模糊使用的问题,这种模糊性可能导致事件循环中的潜在死锁和不清晰的用户交互。核心目标是将 +`ConnectionException` 重新定义为内部控制流异常,并引入事件驱动的生命周期回调(`onClose`、`onReadableEnd`、`onWritableEnd` +),以提供清晰的连接状态管理和健壮的错误处理。 + +### 读写全工/半全工的取舍方案 + +为了更好地管理连接状态,特别是半关闭场景,PR引入了 `onClose`、`onReadableEnd` 和 `onWritableEnd` 这三个事件驱动的生命周期回调。 + +- `onReadableEnd` 和 `onWritableEnd` 专门用于处理连接的读端或写端关闭的半关闭情况。这使得反应器能够在发生致命I/O错误时可靠地终止连接,同时通过明确的事件通知用户连接的单向关闭状态。 +- 这种方案提供了比单一 `ConnectionException` 更细粒度和明确的连接状态管理方式,增强了系统的健壮性。 + +### 禁止捕获的关键异常类型的最终决策 + +核心决策是明确区分内部控制流异常和应用层错误。 + +- `ConnectionException` 现在专门用于反应器内部的控制流。 +- 引入了 `TransportException` 来处理更广泛的传输层错误,取代了许多之前 `ConnectionException` 的用法。 +- `TransportTimeoutException` 被添加用于特定的SSL握手超时,提供更精确的错误区分。 +- `AbortConnection` 接口作为标记接口,用于表示那些旨在中止连接的异常,这些异常对于反应器的内部状态管理至关重要。 +- **最终决策**: 这些内部控制流异常(`ConnectionException`、`TransportException`、`TransportTimeoutException` 以及任何实现 + `AbortConnection` 接口的异常)主要用于反应器的内部逻辑。应用程序代码通常**不应直接捕获** + 这些特定异常,以避免干扰反应器终止连接和管理状态的内部逻辑。相反,应用程序应依赖新的事件驱动回调(`onClose`、 + `onReadableEnd`、`onWritableEnd`)来获取连接状态变化和错误通知。如果应用程序确实需要捕获它们(例如,用于日志记录),则在处理后必须重新抛出它们,以确保反应器的控制流不被中断。 + +### 规范文档 + +#### 连接生命周期和错误处理指南 + +本次PR优化了连接生命周期事件和错误的处理方式,旨在构建一个更健壮、可预测的系统。 + +- **事件驱动的生命周期**: + - 不再依赖异常来处理连接状态变化,而是使用新的事件驱动回调: + - `onClose()`: 当连接完全关闭时触发。 + - `onReadableEnd()`: 当连接的读端关闭时触发(半关闭)。 + - `onWritableEnd()`: 当连接的写端关闭时触发(半关闭)。 + - 这些回调为连接状态转换(包括半关闭场景)提供了明确的通知,从而实现精确的资源管理和用户交互。 + +- **异常处理**: + - **内部控制流异常**: `ConnectionException`、`TransportException`、`TransportTimeoutException` 以及任何实现 + `AbortConnection` 接口的异常,主要用于反应器的内部控制流。 + - **通常不应捕获**: 应用程序级别的代码通常应**避免直接捕获**这些特定异常。捕获它们可能会干扰反应器终止连接和管理状态的内部逻辑,从而可能导致死锁或状态不一致。 + - **依赖回调**: 对于与连接终止相关的应用层错误处理,请依赖 `onClose` 回调,它将在反应器处理完内部异常后触发。 + - **特定用例**: 如果应用程序**必须**捕获这些异常(例如,用于特定的日志记录或调试),则在处理后重新抛出它们至关重要,以确保反应器能够完成其预期的控制流。不要抑制这些异常。 + +这种方法确保了反应器能够可靠地管理I/O错误和连接状态,同时为用户提供了清晰的、基于事件的机制来响应连接生命周期事件。