Skip to content

Commit 2eee870

Browse files
authored
Merge pull request #12 from RunOpenCode/dev
Dev
2 parents ec332aa + 2b8c4b1 commit 2eee870

12 files changed

Lines changed: 631 additions & 4 deletions

File tree

docs/source/components/dataset/operators/list/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Available operators
1313
finalize
1414
flatten
1515
if_empty
16+
left_join
1617
map
1718
merge
1819
overflow
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
==========
2+
leftJoin()
3+
==========
4+
5+
This operator mimics the SQL LEFT JOIN found in relational databases.
6+
7+
It iterates over the source stream and joins each item with values from the
8+
joining stream based on strict key equality.
9+
10+
The operator yields the key from the source stream and a tuple containing the
11+
source value as the first element and an iterable of joined values from the
12+
joining stream as the second element.
13+
14+
15+
.. warning:: This operator is not memory-efficient. Memory consumption depends
16+
on the size of the joining stream.
17+
18+
19+
.. php:namespace:: RunOpenCode\Component\Dataset\Operator
20+
21+
22+
.. php:class:: LeftJoin
23+
24+
.. php:method:: __construct(iterable<TKey, TValue> $source, iterable<TKey, TJoinValue> $join)
25+
26+
:param $source: ``iterable<TKey, TValue>`` Stream source to iterate over on the left side of the left join operation.
27+
:param $join: ``iterable<TKey, TJoinValue>`` Stream source to iterate over on the right side of the left join operation.
28+
29+
30+
.. php:method:: getIterator()
31+
32+
:returns: ``\Traversable<TKey, TValue>`` Stream source joined with values from joining stream source.
33+
34+
Use cases
35+
---------
36+
37+
* Join operation is required which can not be executed on database level.
38+
39+
Example
40+
-------
41+
42+
Migrate users and their addresses from legacy system in batches.
43+
44+
45+
46+
.. code-block:: php
47+
:linenos:
48+
49+
<?php
50+
51+
use App\Model\User;
52+
use App\Model\Address;
53+
use RunOpenCode\Component\Dataset\Stream;
54+
use RunOpenCode\Component\Dataset\Model\Buffer;
55+
56+
$users = $database->executeQuery('SELECT * FROM users...');
57+
58+
new Stream($users)
59+
->bufferCount(100)
60+
->map(function(Buffer $buffer) use ($database): array {
61+
$users = $buffer
62+
->stream()
63+
->map(keyTransform: static fn(array $row): int => $row['id'])
64+
->collect(ArrayCollector::class);
65+
66+
$addresses = new Stream($database->executeQuery('SELECT * FROM addresses ... WHERE user_id IN (:ids)', [
67+
'ids' => \array_keys($users),
68+
]))->map(keyTransform: static fn(array $row): int => $row['user_id']);
69+
70+
return [ $users, $addresses ];
71+
})
72+
->map(function(array $batch): Stream {
73+
[$users, $addresses] = $batch;
74+
75+
return new Stream($users)
76+
->leftJoin($addresses);
77+
})
78+
->flatten()
79+
->map(valueTransform: function(array $joined): User {
80+
[$user, $addresses] = $joined;
81+
82+
return User::fromArray($user)
83+
->setAddresses(\array_map(static fn(array $address): Address => Address::fromArray($address), $addresses));
84+
})
85+
->bufferCount(100)
86+
->tap(function(Buffer $buffer) use ($orm) {
87+
$buffer
88+
->stream()
89+
->tap(function(User $user) use ($orm) {
90+
$orm->persist($user);
91+
})
92+
->flush();
93+
94+
$orm->flush();
95+
$orm->clear();
96+
})
97+
->flush();
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunOpenCode\Component\Dataset\Collector;
6+
7+
use RunOpenCode\Component\Dataset\Contract\CollectorInterface;
8+
use RunOpenCode\Component\Dataset\Contract\StreamInterface;
9+
use RunOpenCode\Component\Dataset\Exception\LogicException;
10+
use RunOpenCode\Component\Dataset\Exception\OutOfBoundsException;
11+
use RunOpenCode\Component\Dataset\Exception\UnsupportedException;
12+
13+
/**
14+
* Collects items into an iterable, indexing values by their keys.
15+
*
16+
* The collector assumes that keys are not unique; therefore, accessing
17+
* a value by key returns a list of values.
18+
*
19+
* Currently, the allowed key types are scalar values
20+
* (int, float, string, bool, null) and objects.
21+
*
22+
* @template TKey
23+
* @template TValue
24+
*
25+
* @implements CollectorInterface<iterable<TKey,TValue>>
26+
* @implements \IteratorAggregate<TKey, TValue>
27+
* @implements \ArrayAccess<TKey, list<TValue>>
28+
*/
29+
final class IndexedCollector implements CollectorInterface, \ArrayAccess, \IteratorAggregate, \Countable
30+
{
31+
/**
32+
* {@inheritdoc}
33+
*/
34+
public mixed $value {
35+
get => $this->getIterator();
36+
}
37+
38+
/**
39+
* {@inheritdoc}
40+
*/
41+
public array $aggregated {
42+
get => $this->source instanceof StreamInterface ? $this->source->aggregated : [];
43+
}
44+
45+
/**
46+
* {@inheritdoc}
47+
*/
48+
public bool $closed {
49+
get => false;
50+
}
51+
52+
/**
53+
* Index of values with keys of scalar type.
54+
*
55+
* @var array<TKey, list<TValue>>
56+
*/
57+
private array $scalarIndex = [];
58+
59+
/**
60+
* Index of values with keys of object type.
61+
*
62+
* @var \SplObjectStorage<TKey&object, list<TValue>>
63+
*/
64+
private \SplObjectStorage $objectIndex;
65+
66+
/**
67+
* Collected values from stream.
68+
*
69+
* @var array<array{TKey, TValue}>
70+
*/
71+
private array $collected = [];
72+
73+
/**
74+
* @param iterable<TKey, TValue> $source Stream source to collect.
75+
*/
76+
public function __construct(
77+
private readonly iterable $source,
78+
) {
79+
$this->objectIndex = new \SplObjectStorage();
80+
81+
foreach ($this->source as $key => $value) {
82+
$this->collected[] = [$key, $value];
83+
84+
if (\is_string($key) || \is_int($key)) {
85+
$this->scalarIndex[$key] = $this->scalarIndex[$key] ?? [];
86+
$this->scalarIndex[$key][] = $value;
87+
continue;
88+
}
89+
90+
if (\is_object($key)) {
91+
$current = $this->objectIndex->contains($key) ? $this->objectIndex[$key] : [];
92+
93+
$current[] = $value;
94+
$this->objectIndex[$key] = $current;
95+
continue;
96+
}
97+
98+
throw new UnsupportedException('Only object, string and integer keys are supported.');
99+
}
100+
}
101+
102+
/**
103+
* {@inheritdoc}
104+
*/
105+
public function getIterator(): \Traversable
106+
{
107+
foreach ($this->collected as [$key, $value]) {
108+
yield $key => $value;
109+
}
110+
}
111+
112+
/**
113+
* {@inheritdoc}
114+
*/
115+
public function offsetExists(mixed $offset): bool
116+
{
117+
return match (true) {
118+
\is_string($offset) || \is_int($offset) => \array_key_exists($offset, $this->scalarIndex),
119+
\is_object($offset) => $this->objectIndex->contains($offset),
120+
default => throw new UnsupportedException('Only object, string and integer keys are supported.'),
121+
};
122+
}
123+
124+
/**
125+
* Get values for given key.
126+
*
127+
* @param TKey $offset
128+
*
129+
* @return list<TValue>
130+
*/
131+
public function offsetGet(mixed $offset): mixed
132+
{
133+
if (!$this->offsetExists($offset)) {
134+
throw new OutOfBoundsException($offset, $this->value);
135+
}
136+
137+
return match (true) {
138+
\is_string($offset) || \is_int($offset) => $this->scalarIndex[$offset],
139+
\is_object($offset) => $this->objectIndex[$offset],
140+
default => throw new UnsupportedException('Only object, string and integer keys are supported.'),
141+
};
142+
}
143+
144+
/**
145+
* {@inheritdoc}
146+
*/
147+
public function offsetSet(mixed $offset, mixed $value): void
148+
{
149+
throw new LogicException(\sprintf(
150+
'Cannot set value for key "%s". Collector "%s" is read-only.',
151+
\var_export($offset, true),
152+
self::class,
153+
));
154+
}
155+
156+
/**
157+
* {@inheritdoc}
158+
*/
159+
public function offsetUnset(mixed $offset): void
160+
{
161+
throw new LogicException(\sprintf(
162+
'Cannot unset value for key "%s". Collector "%s" is read-only.',
163+
\var_export($offset, true),
164+
self::class,
165+
));
166+
}
167+
168+
/**
169+
* {@inheritdoc}
170+
*/
171+
public function count(): int
172+
{
173+
return \count($this->collected);
174+
}
175+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunOpenCode\Component\Dataset\Exception;
6+
7+
/**
8+
* Thrown when an unsupported operation is attempted.
9+
*
10+
* Lack of support may be due to various reasons, such as:
11+
*
12+
* - The feature is not implemented in the current version.
13+
* - The underlying system or library does not provide support for the requested operation.
14+
* - The operation is not applicable in the current context or configuration.
15+
*/
16+
class UnsupportedException extends RuntimeException
17+
{
18+
}

src/RunOpenCode/Component/Dataset/src/Model/Buffer.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace RunOpenCode\Component\Dataset\Model;
66

77
use RunOpenCode\Component\Dataset\Exception\LogicException;
8+
use RunOpenCode\Component\Dataset\Stream;
89

910
/**
1011
* Buffer of iterated items from collection.
@@ -30,6 +31,18 @@ public function __construct(private \ArrayObject $items)
3031
// noop.
3132
}
3233

34+
/**
35+
* Create stream from buffer.
36+
*
37+
* @return Stream<TKey, TValue>
38+
*
39+
* @phpstan-ignore-next-line generics.variance
40+
*/
41+
public function stream(): Stream
42+
{
43+
return new Stream($this);
44+
}
45+
3346
/**
3447
* Get first item in buffer.
3548
*

0 commit comments

Comments
 (0)