Skip to content

Commit fd40feb

Browse files
committed
Implemented left join operator
1 parent 012dca6 commit fd40feb

7 files changed

Lines changed: 423 additions & 0 deletions

File tree

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunOpenCode\Component\Dataset\Collector;
6+
7+
use RunOpenCode\Component\Dataset\Contract\CollectorInterface;
8+
use RunOpenCode\Component\Dataset\Contract\StreamInterface;
9+
use RunOpenCode\Component\Dataset\Exception\LogicException;
10+
use RunOpenCode\Component\Dataset\Exception\OutOfBoundsException;
11+
use RunOpenCode\Component\Dataset\Exception\UnsupportedException;
12+
13+
/**
14+
* @template TKey
15+
* @template TValue
16+
*
17+
* @implements CollectorInterface<iterable<TKey,TValue>>
18+
* @implements \IteratorAggregate<TKey, TValue>
19+
* @implements \ArrayAccess<TKey, list<TValue>>
20+
*/
21+
final class IndexCollector implements CollectorInterface, \ArrayAccess, \IteratorAggregate, \Countable
22+
{
23+
/**
24+
* {@inheritdoc}
25+
*/
26+
public mixed $value {
27+
get => $this->getIterator();
28+
}
29+
30+
/**
31+
* {@inheritdoc}
32+
*/
33+
public array $aggregated {
34+
get => $this->collection instanceof StreamInterface ? $this->collection->aggregated : [];
35+
}
36+
37+
/**
38+
* {@inheritdoc}
39+
*/
40+
public bool $closed {
41+
get => false;
42+
}
43+
44+
/** @var array<TKey, list<TValue>> */
45+
private array $scalarIndex = [];
46+
47+
/** @var \SplObjectStorage<TKey&object, list<TValue>> */
48+
private \SplObjectStorage $objectIndex;
49+
50+
/** @var array<array{TKey, TValue}> */
51+
private array $collected = [];
52+
53+
/**
54+
* @param iterable<TKey, TValue> $collection Collection to collect.
55+
*/
56+
public function __construct(
57+
private readonly iterable $collection,
58+
) {
59+
$this->objectIndex = new \SplObjectStorage();
60+
61+
foreach ($this->collection as $key => $value) {
62+
$this->collected[] = [$key, $value];
63+
64+
if (is_string($key) || is_int($key)) {
65+
$this->scalarIndex[$key] = $this->scalarIndex[$key] ?? [];
66+
$this->scalarIndex[$key][] = $value;
67+
continue;
68+
}
69+
70+
if (is_object($key)) {
71+
$current = $this->objectIndex->contains($key) ? $this->objectIndex[$key] : [];
72+
73+
$current[] = $value;
74+
$this->objectIndex[$key] = $current;
75+
continue;
76+
}
77+
78+
throw new UnsupportedException('Only object, string and integer keys are supported.');
79+
}
80+
}
81+
82+
public function getIterator(): \Traversable
83+
{
84+
foreach ($this->collected as [$key, $value]) {
85+
yield $key => $value;
86+
}
87+
}
88+
89+
public function offsetExists(mixed $offset): bool
90+
{
91+
return match (true) {
92+
is_string($offset) || is_int($offset) => \array_key_exists($offset, $this->scalarIndex),
93+
is_object($offset) => $this->objectIndex->contains($offset),
94+
default => throw new UnsupportedException('Only object, string and integer keys are supported.'),
95+
};
96+
}
97+
98+
/**
99+
* @param TKey $offset
100+
*
101+
* @return list<TValue>
102+
*/
103+
public function offsetGet(mixed $offset): mixed
104+
{
105+
if (!$this->offsetExists($offset)) {
106+
throw new OutOfBoundsException($offset, $this->value);
107+
}
108+
109+
return match (true) {
110+
is_string($offset) || is_int($offset) => $this->scalarIndex[$offset],
111+
is_object($offset) => $this->objectIndex[$offset],
112+
default => throw new UnsupportedException('Only object, string and integer keys are supported.'),
113+
};
114+
}
115+
116+
public function offsetSet(mixed $offset, mixed $value): void
117+
{
118+
throw new LogicException(\sprintf(
119+
'Cannot set value for key "%s". Collector "%s" is read-only.',
120+
\var_export($offset, true),
121+
self::class,
122+
));
123+
}
124+
125+
public function offsetUnset(mixed $offset): void
126+
{
127+
throw new LogicException(\sprintf(
128+
'Cannot unset value for key "%s". Collector "%s" is read-only.',
129+
\var_export($offset, true),
130+
self::class,
131+
));
132+
}
133+
134+
public function count(): int
135+
{
136+
return \count($this->collected);
137+
}
138+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunOpenCode\Component\Dataset\Exception;
6+
7+
/**
8+
* General library's runtime exception.
9+
*/
10+
class RuntimeException extends \RuntimeException implements ExceptionInterface
11+
{
12+
public function __construct(string $message, ?\Throwable $previous = null)
13+
{
14+
parent::__construct($message, 0, $previous);
15+
}
16+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunOpenCode\Component\Dataset\Exception;
6+
7+
/**
8+
* Thrown when an unsupported operation is attempted.
9+
*
10+
* Lack of support may be due to various reasons, such as:
11+
*
12+
* - The feature is not implemented in the current version.
13+
* - The underlying system or library does not provide support for the requested operation.
14+
* - The operation is not applicable in the current context or configuration.
15+
*/
16+
class UnsupportedException extends RuntimeException
17+
{
18+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunOpenCode\Component\Dataset\Operator;
6+
7+
use RunOpenCode\Component\Dataset\AbstractStream;
8+
use RunOpenCode\Component\Dataset\Collector\IndexCollector;
9+
use RunOpenCode\Component\Dataset\Contract\OperatorInterface;
10+
use RunOpenCode\Component\Dataset\Stream;
11+
12+
/**
13+
* Left join operator.
14+
*
15+
* Left join operator iterates over left collection and joins items from right collection
16+
* based on matching keys. For each item in the left collection, yields an array containing
17+
* the left item and all matching items from the right collection.
18+
*
19+
* Example usage:
20+
*
21+
* ```php
22+
* use RunOpenCode\Component\Dataset\Operator\LeftJoin;
23+
*
24+
* $leftJoin = new LeftJoin(
25+
* left: new Dataset([1 => 'a', 2 => 'b', 3 => 'c']),
26+
* right: new Dataset([1 => 'x', 1 => 'y', 2 => 'z']),
27+
* );
28+
*
29+
* // The resulting sequence will be:
30+
* // 1 => ['a', ['x', 'y']]
31+
* // 2 => ['b', ['z']]
32+
* // 3 => ['c', []]
33+
* ```
34+
*
35+
* @template TKey
36+
* @template TLeftValue
37+
* @template TRightValue
38+
*
39+
* @extends AbstractStream<TKey, array{TLeftValue, iterable<TRightValue>}>
40+
* @implements OperatorInterface<TKey, array{TLeftValue, iterable<TRightValue>}>
41+
*/
42+
final class LeftJoin extends AbstractStream implements OperatorInterface
43+
{
44+
/**
45+
* @param iterable<TKey, TLeftValue> $source Source to iterate over.
46+
* @param iterable<TKey, TRightValue> $join Collection to join with.
47+
*/
48+
public function __construct(
49+
private readonly iterable $source,
50+
private readonly iterable $join
51+
) {
52+
parent::__construct($this->source);
53+
}
54+
55+
/**
56+
* {@inheritdoc}
57+
*/
58+
protected function iterate(): \Traversable
59+
{
60+
$join = new Stream($this->join)->collect(IndexCollector::class);
61+
62+
foreach ($this->source as $key => $value) {
63+
yield $key => [
64+
$value,
65+
$join->offsetExists($key) ? $join->offsetGet($key) : [],
66+
];
67+
}
68+
}
69+
}
70+

src/RunOpenCode/Component/Dataset/src/functions.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,27 @@ function merge(iterable $first, iterable $second): Stream
276276
);
277277
}
278278

279+
/**
280+
* Create left join operator.
281+
*
282+
* @template TKey of array-key
283+
* @template TLeftValue
284+
* @template TRightValue
285+
*
286+
* @param iterable<TKey, TLeftValue> $left Left collection to iterate over.
287+
* @param iterable<TKey, TRightValue> $right Right collection to join with.
288+
*
289+
* @return Stream<TKey, array{TLeftValue, iterable<TRightValue>}>
290+
*
291+
* @see Operator\LeftJoin
292+
*/
293+
function left_join(iterable $left, iterable $right): Stream
294+
{
295+
return new Stream(
296+
new Operator\LeftJoin($left, $right)
297+
);
298+
}
299+
279300
/**
280301
* Create overflow operator.
281302
*
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunOpenCode\Component\Dataset\Tests\Collector;
6+
7+
use PHPUnit\Framework\Attributes\Test;
8+
use PHPUnit\Framework\TestCase;
9+
use RunOpenCode\Component\Dataset\Collector\IndexCollector;
10+
use RunOpenCode\Component\Dataset\Exception\LogicException;
11+
use RunOpenCode\Component\Dataset\Reducer\Average;
12+
use RunOpenCode\Component\Dataset\Reducer\Count;
13+
use RunOpenCode\Component\Dataset\Reducer\Sum;
14+
use function RunOpenCode\Component\Dataset\collect;
15+
use function RunOpenCode\Component\Dataset\stream;
16+
17+
final class IndexCollectorTest extends TestCase
18+
{
19+
#[Test]
20+
public function iterates(): void
21+
{
22+
$generator = function() {
23+
yield 'a' => 1;
24+
yield 'a' => 2;
25+
yield 'b' => 3;
26+
};
27+
28+
$collector = collect($generator(), IndexCollector::class);
29+
30+
// iterator_to_array overwrites duplicate keys
31+
$this->assertSame(['a'=>2, 'b'=>3], \iterator_to_array($collector));
32+
33+
// Verify it's rewindable and contains all values
34+
$allValues = [];
35+
foreach ($collector->value as $key => $value) {
36+
$allValues[] = [$key, $value];
37+
}
38+
$this->assertSame([['a', 1], ['a', 2], ['b', 3]], $allValues);
39+
}
40+
41+
#[Test]
42+
public function array_access_scalar_keys(): void
43+
{
44+
$generator = function() {
45+
yield 0 => 'a';
46+
yield 0 => 'b';
47+
yield 1 => 'c';
48+
};
49+
50+
$collector = collect($generator(), IndexCollector::class);
51+
52+
$this->assertSame(['a', 'b'], $collector[0]);
53+
$this->assertSame(['c'], $collector[1]);
54+
}
55+
56+
#[Test]
57+
public function array_access_object_keys(): void
58+
{
59+
$object1 = new \stdClass();
60+
$object2 = new \stdClass();
61+
62+
$generator = function() use ($object1, $object2) {
63+
yield $object1 => 'a';
64+
yield $object1 => 'b';
65+
yield $object2 => 'c';
66+
};
67+
68+
$collector = collect($generator(), IndexCollector::class);
69+
70+
$this->assertSame(['a', 'b'], $collector[$object1]);
71+
$this->assertSame(['c'], $collector[$object2]);
72+
}
73+
74+
#[Test]
75+
public function counts(): void
76+
{
77+
$dataset = [2, 10, 5, 1];
78+
79+
$collector = collect($dataset, IndexCollector::class);
80+
81+
$this->assertCount(4, $collector);
82+
}
83+
84+
#[Test]
85+
public function aggregates(): void
86+
{
87+
$dataset = [2, 10];
88+
89+
$collector = stream($dataset)
90+
->aggregate('count', Count::class)
91+
->aggregate('sum', Sum::class)
92+
->aggregate('average', Average::class)
93+
->collect(IndexCollector::class);
94+
95+
$this->assertSame(2, $collector->aggregated['count']);
96+
$this->assertSame(12, $collector->aggregated['sum']);
97+
$this->assertEqualsWithDelta(6, $collector->aggregated['average'], 0.0001);
98+
}
99+
100+
#[Test]
101+
public function array_access_set_throws_exception(): void
102+
{
103+
$this->expectException(LogicException::class);
104+
105+
collect([], IndexCollector::class)[10] = ['bar'];
106+
}
107+
108+
#[Test]
109+
public function array_access_unset_throws_exception(): void
110+
{
111+
$this->expectException(LogicException::class);
112+
113+
unset(collect([], IndexCollector::class)[20]);
114+
}
115+
}

0 commit comments

Comments
 (0)