|
| 1 | +======================= |
| 2 | +Write your own operator |
| 3 | +======================= |
| 4 | + |
| 5 | +To create your own operator, you need to define a class that implements |
| 6 | +``RunOpenCode\Component\Dataset\Contract\OperatorInterface``. The library also |
| 7 | +provides a base class, ``RunOpenCode\Component\Dataset\AbstractStream``, |
| 8 | +which serves as a prototype for custom operators and significantly simplifies |
| 9 | +their implementation. |
| 10 | + |
| 11 | +.. _PHPStan: https://phpstan.org |
| 12 | + |
| 13 | +.. note:: This tutorial assumes that you are familiar with `PHPStan`_ and |
| 14 | + generics. |
| 15 | + |
| 16 | +For the purpose of this tutorial, we will implement a log() operator. The goal |
| 17 | +of this operator is to monitor a stream and write each item (key and value) to a |
| 18 | +file. |
| 19 | + |
| 20 | +.. note:: Security, usability, bugs, and edge cases are intentionally ignored |
| 21 | + in this example. The goal of the tutorial is to demonstrate the |
| 22 | + process of writing and using a custom operator. |
| 23 | + |
| 24 | +Creating the operator class |
| 25 | +--------------------------- |
| 26 | + |
| 27 | +First, we need to create our operator class and define its signature. |
| 28 | + |
| 29 | +.. code-block:: php |
| 30 | + :linenos: |
| 31 | +
|
| 32 | + <?php |
| 33 | +
|
| 34 | + declare(strict_types=1); |
| 35 | +
|
| 36 | + namespace App\Operator; |
| 37 | +
|
| 38 | + use RunOpenCode\Component\Dataset\AbstractStream; |
| 39 | + use RunOpenCode\Component\Dataset\Contract\OperatorInterface; |
| 40 | +
|
| 41 | + /** |
| 42 | + * @template TKey |
| 43 | + * @template TValue |
| 44 | + * |
| 45 | + * @extends AbstractStream<TKey, TValue> |
| 46 | + * @implements OperatorInterface<TKey, TValue> |
| 47 | + */ |
| 48 | + final class Log extends AbstractStream implements OperatorInterface |
| 49 | + { |
| 50 | + /** |
| 51 | + * @param iterable<TKey, TValue> $source Stream source to iterate over. |
| 52 | + * @param non-empty-string $file File where items should be stored. |
| 53 | + */ |
| 54 | + public function __construct( |
| 55 | + iterable $source, |
| 56 | + private readonly string $file, |
| 57 | + ) { |
| 58 | + parent::__construct($source); |
| 59 | + } |
| 60 | + } |
| 61 | +
|
| 62 | +The operator class extends ``RunOpenCode\Component\Dataset\AbstractStream`` and |
| 63 | +implements ``RunOpenCode\Component\Dataset\Contract\OperatorInterface``. |
| 64 | + |
| 65 | +The class defines two generic template parameters, ``TKey`` and ``TValue``, |
| 66 | +which describe the key and value types yielded by the operator. This information |
| 67 | +is required by PHPStan for correct type inference. |
| 68 | + |
| 69 | +The constructor accepts two arguments: |
| 70 | + |
| 71 | +* The source stream to iterate over. |
| 72 | +* A non-empty string representing the path to the file where stream items |
| 73 | + will be written. |
| 74 | + |
| 75 | +From the declared generic types and constructor signature, it is clear that this |
| 76 | +operator does not modify the original stream. |
| 77 | + |
| 78 | +.. note:: If you need to implement an operator that modifies the stream, you can |
| 79 | + refer to existing implementations such as |
| 80 | + ``RunOpenCode\Component\Dataset\Operator\Map`` or |
| 81 | + ``RunOpenCode\Component\Dataset\Operator\Merge``. |
| 82 | + |
| 83 | +The constructor must pass the source stream to the parent ``AbstractStream`` |
| 84 | +implementation. This is required so that the library can correctly track |
| 85 | +upstreams and provide proper support for aggregators. |
| 86 | + |
| 87 | +Lines 12 and 13 are required in order to provide information to the PHPStan what |
| 88 | +will operator yield. Line 22 provides information about input stream source and |
| 89 | +what input source streams. Line 23 defines required parameter for operator - a |
| 90 | +path to a file where stream items will be stored. |
| 91 | + |
| 92 | +It is clear from lines 12, 13 and 22 that operator does not modifies the |
| 93 | +original stream. |
| 94 | + |
| 95 | +Line 25 is required and in its essence, it will provide source stream to |
| 96 | +prototype stream implementation ``RunOpenCode\Component\Dataset\AbstractStream`` |
| 97 | +which will track upstreams and provide correct support for aggregators. |
| 98 | + |
| 99 | +Implementing the operator logic |
| 100 | +------------------------------- |
| 101 | + |
| 102 | +Next, implement the ``iterate(): \Traversable<TKey, TValue>`` method. |
| 103 | +This method is executed when the stream is iterated and contains the actual |
| 104 | +operator logic. |
| 105 | + |
| 106 | +.. code-block:: php |
| 107 | + :linenos: |
| 108 | +
|
| 109 | + <?php |
| 110 | +
|
| 111 | + declare(strict_types=1); |
| 112 | +
|
| 113 | + namespace App\Operator; |
| 114 | +
|
| 115 | + use RunOpenCode\Component\Dataset\AbstractStream; |
| 116 | + use RunOpenCode\Component\Dataset\Contract\OperatorInterface; |
| 117 | +
|
| 118 | + /** |
| 119 | + * @template TKey |
| 120 | + * @template TValue |
| 121 | + * |
| 122 | + * @extends AbstractStream<TKey, TValue> |
| 123 | + * @implements OperatorInterface<TKey, TValue> |
| 124 | + */ |
| 125 | + final class Log extends AbstractStream implements OperatorInterface |
| 126 | + { |
| 127 | + // Code omitted for the sake of readability. |
| 128 | +
|
| 129 | + /** |
| 130 | + * {@inheritdoc} |
| 131 | + */ |
| 132 | + public function iterate(): \Traversable |
| 133 | + { |
| 134 | + $handler = \Safe\fopen($this->file, 'wb'); // @see https://github.com/thecodingmachine/safe |
| 135 | +
|
| 136 | + try { |
| 137 | + foreach($this->source as $key => $value) { |
| 138 | + \Safe\fwrite($handler, \sprintf( |
| 139 | + 'Key: "%s", Value: "%s"', |
| 140 | + self::stringify($key), |
| 141 | + self::stringify($value), |
| 142 | + )); |
| 143 | +
|
| 144 | + yield $key => $value; |
| 145 | + } |
| 146 | + } finally { |
| 147 | + \Safe\fclose($handler); |
| 148 | + } |
| 149 | + } |
| 150 | +
|
| 151 | + /** |
| 152 | + * Cast anything to its string representation. |
| 153 | + */ |
| 154 | + private static function stringify(mixed $value): string |
| 155 | + { |
| 156 | + // Implementation omitted. |
| 157 | + } |
| 158 | + } |
| 159 | +
|
| 160 | +Inside this method, a file handler is opened using the provided file path. |
| 161 | +The source stream is then iterated item by item. |
| 162 | + |
| 163 | +For each key–value pair: |
| 164 | + |
| 165 | +* The key and value are converted to their string representations |
| 166 | + (implementation of method ``Log::stringify()`` is omitted for the sake of |
| 167 | + readability). |
| 168 | +* The formatted output is written to the file. |
| 169 | +* The original key and value are yielded back to the stream. |
| 170 | + |
| 171 | +**Yielding the original key and value is the most important step**, as it allows |
| 172 | +the stream to continue flowing to downstream operators or consumers. |
| 173 | + |
| 174 | +The file handler is closed in a finally block to ensure that resources are |
| 175 | +released even if an error occurs during iteration. |
| 176 | + |
| 177 | +.. warning:: This implementation assumes that the stream will be fully iterated, |
| 178 | + which is not always the case. Consumer of the stream can break |
| 179 | + iteration (either by using ``break`` in loop, or by using operator |
| 180 | + which limits number of iterations, such as ``take()``). When |
| 181 | + writing your own operators, always account for early termination |
| 182 | + and ensure that resources are handled correctly. |
| 183 | + |
| 184 | +Testing the operator |
| 185 | +-------------------- |
| 186 | + |
| 187 | +Once the operator is complete, it can be unit tested without requiring any |
| 188 | +external dependencies. |
| 189 | + |
| 190 | +.. code-block:: php |
| 191 | + :linenos: |
| 192 | +
|
| 193 | + <?php |
| 194 | +
|
| 195 | + declare(strict_types=1); |
| 196 | +
|
| 197 | + namespace App\Tests\Operator\Log; |
| 198 | +
|
| 199 | + use App\Operator\Log; |
| 200 | + use PHPUnit\Framework\Attributes\Test; |
| 201 | + use PHPUnit\Framework\TestCase; |
| 202 | +
|
| 203 | + final class LogTest extends TestCase |
| 204 | + { |
| 205 | + #[Test] |
| 206 | + public function logs(): void |
| 207 | + { |
| 208 | + $this->assertFileDoesNotExist('/tmp/test_logs.log'); |
| 209 | +
|
| 210 | + $operator = new Log([1, 2, 3], '/tmp/test_logs.log'); |
| 211 | + |
| 212 | + \iterator_to_array($operator); |
| 213 | +
|
| 214 | + $this->assertSame( |
| 215 | + \Safe\file_get_contents('/path/to/expected/output/file.log'), |
| 216 | + \Safe\file_get_contents('/tmp/test_logs.log'), |
| 217 | + ); |
| 218 | +
|
| 219 | + \Safe\unlink('/tmp/test_logs.log'); |
| 220 | + } |
| 221 | + } |
| 222 | +
|
| 223 | +Using the operator |
| 224 | +------------------ |
| 225 | + |
| 226 | +When using streams in an object-oriented style, you can call the ``operator()`` |
| 227 | +method on an instance of ``RunOpenCode\Component\Dataset\Stream`` to apply your |
| 228 | +operator. |
| 229 | + |
| 230 | + |
| 231 | +.. code-block:: php |
| 232 | + :linenos: |
| 233 | +
|
| 234 | + <?php |
| 235 | +
|
| 236 | + declare(strict_types=1); |
| 237 | +
|
| 238 | + use RunOpenCode\Component\Dataset\Stream; |
| 239 | + use App\Operator\Log; |
| 240 | +
|
| 241 | + Stream(...) |
| 242 | + ->operator(Log::class, '/path/to/file.log'); |
| 243 | +
|
| 244 | +When using the functional style in PHP 8.5 or later, the ``operator()`` function |
| 245 | +is also available. |
| 246 | + |
| 247 | +.. code-block:: php |
| 248 | + :linenos: |
| 249 | +
|
| 250 | + <?php |
| 251 | +
|
| 252 | + declare(strict_types=1); |
| 253 | +
|
| 254 | + use function RunOpenCode\Component\Dataset\stream; |
| 255 | + use function RunOpenCode\Component\Dataset\operator; |
| 256 | + use App\Operator\Log; |
| 257 | +
|
| 258 | + $source = [...]; |
| 259 | +
|
| 260 | + $processed = $source |> stream(...) |
| 261 | + |> static fn(iterable $stream): Stream => operator($stream, Log::class, '/path/to/file.log'); |
| 262 | +
|
| 263 | +General advices for implementing operators |
| 264 | +------------------------------------------ |
| 265 | + |
| 266 | +* **Keep your operators simple.** The general idea of each operator is to |
| 267 | + perform one simple, fundamental operation. Any data-processing complexity |
| 268 | + should be achieved by composing multiple simple operators, not by implementing |
| 269 | + a single complex operator that performs multiple tasks simultaneously. |
| 270 | +* **Reuse existing operators whenever possible.** If your operator can be |
| 271 | + expressed as a composition of existing operators, with only a small amount of |
| 272 | + custom stream-specific logic, do not reimplement existing functionality. |
| 273 | + Existing operators are already unit-tested, which makes it easier to reason |
| 274 | + about their composition than to write everything from scratch. |
| 275 | +* **Keys can be anything — do not break this assumption.** A generator may emit |
| 276 | + values with any type of key. |
| 277 | +* **Keys are not unique — do not break this assumption.** A generator may emit |
| 278 | + multiple items with the same key. |
| 279 | +* **Streams are not rewindable — do not break this assumption.** Although |
| 280 | + ``array`` is ``iterable``, ``\Generator`` is also ``iterable``, and generators |
| 281 | + cannot be rewound. |
| 282 | +* **Streams may not be fully iterated** — do not assume that users of your |
| 283 | + operator will consume the entire stream. They may use ``break`` or ``take()`` |
| 284 | + in their code. |
| 285 | +* **Do not lock or hold resources.** If you do, release them when the operator |
| 286 | + finishes streaming. Since there is no reliable way to detect whether streaming |
| 287 | + was interrupted (for example, via ``break`` or another operator), ensure that |
| 288 | + your operator implements the ``__destruct()`` method to properly release any |
| 289 | + resources on garbage collection. |
| 290 | + |
| 291 | +If you believe your operator would be valuable to the wider community, feel free |
| 292 | +to submit a pull request! |
0 commit comments