Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
592ff64
init
aglinxinyuan Feb 9, 2026
b95465d
update
aglinxinyuan Feb 11, 2026
f83041a
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Feb 11, 2026
d9d0cd9
update
aglinxinyuan Feb 11, 2026
144ae29
update
aglinxinyuan Feb 11, 2026
7b13fef
update
aglinxinyuan Feb 11, 2026
bd5ac3a
update
aglinxinyuan Feb 11, 2026
19be0c1
update
aglinxinyuan Feb 11, 2026
706884f
update
aglinxinyuan Feb 11, 2026
21e6a41
update
aglinxinyuan Feb 11, 2026
24da3e3
update
aglinxinyuan Feb 11, 2026
2ba0fa4
update
aglinxinyuan Feb 11, 2026
a05ffd1
update
aglinxinyuan Feb 11, 2026
44fc0e7
update
aglinxinyuan Feb 11, 2026
846aac2
update
aglinxinyuan Feb 12, 2026
6be7dc5
update
aglinxinyuan Feb 12, 2026
d8338d1
update
aglinxinyuan Feb 12, 2026
36e517e
fix
aglinxinyuan Feb 13, 2026
a4bfbdb
fix
aglinxinyuan Feb 13, 2026
393faac
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Feb 14, 2026
cbb2fc7
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 14, 2026
084f602
update
aglinxinyuan Feb 15, 2026
ae0d4ed
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 15, 2026
55d5cec
update
aglinxinyuan Feb 15, 2026
b8faf93
update
aglinxinyuan Feb 15, 2026
a53506a
update
aglinxinyuan Feb 15, 2026
d44a664
update
aglinxinyuan Feb 15, 2026
1cd48fd
update
aglinxinyuan Feb 15, 2026
e35a332
update
aglinxinyuan Feb 16, 2026
4d18d1d
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 17, 2026
30a8562
update
aglinxinyuan Feb 24, 2026
b717fb0
update
aglinxinyuan Feb 24, 2026
da8d6ed
Merge remote-tracking branch 'origin/xinyuan-loop-feb' into xinyuan-l…
aglinxinyuan Feb 24, 2026
04fe614
update
aglinxinyuan Feb 27, 2026
160bc6d
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 27, 2026
bd27031
update
aglinxinyuan Feb 28, 2026
99e0f86
update
aglinxinyuan Feb 28, 2026
53ae08b
update
aglinxinyuan Feb 28, 2026
8c7d53c
update
aglinxinyuan Feb 28, 2026
92ab10f
update
aglinxinyuan Mar 1, 2026
bc58566
update
aglinxinyuan Mar 1, 2026
c655856
update
aglinxinyuan Mar 1, 2026
0970a53
update
aglinxinyuan Mar 1, 2026
2b78e2c
update
aglinxinyuan Mar 1, 2026
55f288f
update
aglinxinyuan Mar 1, 2026
2ccde1e
update
aglinxinyuan Mar 1, 2026
9b0d14d
update
aglinxinyuan Mar 1, 2026
3a2d0b9
update
aglinxinyuan Mar 1, 2026
0cfcf2f
update
aglinxinyuan Mar 1, 2026
00e49a5
update
aglinxinyuan Mar 1, 2026
f71dbec
update
aglinxinyuan Mar 1, 2026
565ee71
update
aglinxinyuan Mar 1, 2026
aa444a0
update
aglinxinyuan Mar 2, 2026
2e7c72a
update
aglinxinyuan Mar 2, 2026
f8ce99f
update
aglinxinyuan Mar 2, 2026
fe7e071
update
aglinxinyuan Mar 2, 2026
ba1b50f
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Mar 4, 2026
08679f0
update
aglinxinyuan Mar 4, 2026
b18d9db
update
aglinxinyuan Mar 4, 2026
dac211a
update
aglinxinyuan Mar 5, 2026
43f2ca6
update
aglinxinyuan Mar 5, 2026
3ada4c2
update
aglinxinyuan Mar 8, 2026
be43608
update
aglinxinyuan Mar 8, 2026
f09c9e3
update
aglinxinyuan Mar 8, 2026
64e2dda
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Mar 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ message ControlRequest {
PortCompletedRequest portCompletedRequest = 9;
WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
LinkWorkersRequest linkWorkersRequest = 11;
IterationCompletedRequest iterationCompletedRequest = 12;

// request for worker
AddInputChannelRequest addInputChannelRequest = 50;
Expand All @@ -58,6 +59,7 @@ message ControlRequest {
PrepareCheckpointRequest prepareCheckpointRequest = 57;
QueryStatisticsRequest queryStatisticsRequest = 58;


// request for testing
Ping ping = 100;
Pong pong = 101;
Expand Down Expand Up @@ -278,4 +280,8 @@ enum StatisticsUpdateTarget {
message QueryStatisticsRequest{
repeated core.ActorVirtualIdentity filterByWorkers = 1;
StatisticsUpdateTarget updateTarget = 2;
}

message IterationCompletedRequest{
core.OperatorIdentity LoopStartId = 1 [(scalapb.field).no_box = true];
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ service ControllerService {
rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn);
rpc WorkerStateUpdated(WorkerStateUpdatedRequest) returns (EmptyReturn);
rpc WorkerExecutionCompleted(EmptyRequest) returns (EmptyReturn);
rpc IterationCompleted(IterationCompletedRequest) returns (EmptyReturn);
rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn);
rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn);
rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def __init__(self, worker_id: str):
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()

self._storage_uris: typing.Dict[PortIdentity, str] = dict()

def is_missing_output_ports(self):
"""
This method is only used for ensuring correct region execution.
Expand Down Expand Up @@ -126,6 +128,7 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str):
Create a separate thread for saving output tuples of a port
to storage in batch.
"""
self._storage_uris[port_id] = storage_uri
document, _ = DocumentFactory.open_document(storage_uri)
buffered_item_writer = document.writer(str(get_worker_index(self.worker_id)))
writer_queue = Queue()
Expand Down Expand Up @@ -171,6 +174,21 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None:
PortStorageWriterElement(data_tuple=tuple_)
)

def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None:
if port_id is None:
uris = self._storage_uris.values()
elif port_id in self._storage_uris:
uris = [self._storage_uris[port_id]]
else:
return

for uri in uris:
writer = DocumentFactory.create_document(
uri.replace("/result", "/state"), state.schema
).writer(str(get_worker_index(self.worker_id)))
writer.put_one(Tuple(vars(state)))
writer.close()

def close_port_storage_writers(self) -> None:
"""
Flush the buffers of port storage writers and wait for all the
Expand Down
30 changes: 30 additions & 0 deletions amber/src/main/python/core/models/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,33 @@ def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]
time, or None.
"""
yield


class LoopStartOperator(TableOperator):
def open(self) -> None:
pass

@abstractmethod
def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]:
yield

def close(self) -> None:
pass


class LoopEndOperator(TableOperator):
def open(self) -> None:
pass

@abstractmethod
def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]:
yield

def close(self) -> None:
pass

def condition(self):
return self.state["condition"]

def loop_start_id(self):
return self.state["LoopStartId"]
7 changes: 7 additions & 0 deletions amber/src/main/python/core/models/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def __init__(
self.__dict__.update(table.to_pandas().iloc[0].to_dict())
self.schema = Schema(table.schema)

@classmethod
def from_tuple(cls, tuple, schema):
obj = cls()
obj.__dict__.update(tuple.as_dict())
obj.schema = schema
return obj

def add(
self, key: str, value: any, value_type: Optional[AttributeType] = None
) -> None:
Expand Down
1 change: 1 addition & 0 deletions amber/src/main/python/core/runnables/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def process_state(self, state: State) -> None:
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
self._switch_context()
self._set_output_state(executor.process_state(state, port_id))

except Exception as err:
Expand Down
33 changes: 20 additions & 13 deletions amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
ECMElement,
InternalQueueElement,
)
from core.models.operator import LoopEndOperator, LoopStartOperator
from core.models.state import State
from core.runnables.data_processor import DataProcessor
from core.util import StoppableQueueBlockingRunnable, get_one_of
Expand All @@ -47,7 +48,7 @@
ActorVirtualIdentity,
PortIdentity,
ChannelIdentity,
EmbeddedControlMessageIdentity,
EmbeddedControlMessageIdentity, OperatorIdentity,
)
from proto.org.apache.texera.amber.engine.architecture.rpc import (
ConsoleMessage,
Expand All @@ -60,7 +61,7 @@
EmbeddedControlMessageType,
EmbeddedControlMessage,
AsyncRpcContext,
ControlRequest,
ControlRequest, IterationCompletedRequest,
)
from proto.org.apache.texera.amber.engine.architecture.worker import (
WorkerState,
Expand Down Expand Up @@ -94,12 +95,15 @@ def complete(self) -> None:
"""
# flush the buffered console prints
self._check_and_report_console_messages(force_flush=True)
self.context.executor_manager.executor.close()
controller_interface = self._async_rpc_client.controller_stub()
executor = self.context.executor_manager.executor
if isinstance(executor, LoopEndOperator) and executor.condition():
controller_interface.iteration_completed(IterationCompletedRequest(OperatorIdentity(executor.loop_start_id())))
executor.close()
# stop the data processing thread
self.data_processor.stop()
self.context.state_manager.transit_to(WorkerState.COMPLETED)
self.context.statistics_manager.update_total_execution_time(time.time_ns())
controller_interface = self._async_rpc_client.controller_stub()
controller_interface.worker_execution_completed(EmptyRequest())
self.context.close()

Expand Down Expand Up @@ -188,15 +192,18 @@ def process_input_state(self) -> None:
output_state = self.context.state_processing_manager.get_output_state()
self._switch_context()
if output_state is not None:
for to, batch in self.context.output_manager.emit_state(output_state):
self._output_queue.put(
DataElement(
tag=ChannelIdentity(
ActorVirtualIdentity(self.context.worker_id), to, False
),
payload=batch,
if isinstance(self.context.executor_manager.executor, LoopStartOperator):
output_state.add("LoopStartId", self.context.worker_id.split('-', 1)[1].rsplit('-main-0', 1)[0])
for to, batch in self.context.output_manager.emit_state(output_state):
self._output_queue.put(
DataElement(
tag=ChannelIdentity(
ActorVirtualIdentity(self.context.worker_id), to, False
),
payload=batch,
)
)
)
self.context.output_manager.save_state_to_storage_if_needed(output_state)

def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]:
"""
Expand Down Expand Up @@ -329,7 +336,7 @@ def _process_ecm(self, ecm_element: ECMElement):

if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT:
self.context.pause_manager.resume(PauseType.ECM_PAUSE)

self._switch_context()
if self.context.tuple_processing_manager.current_internal_marker:
{
StartChannel: self._process_start_channel,
Expand Down
107 changes: 59 additions & 48 deletions amber/src/main/python/core/storage/document_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,35 @@ def create_document(uri: str, schema: Schema) -> VirtualDocument:
if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {VFSResourceType.RESULT}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

# Convert Amber Schema to Iceberg Schema with LARGE_BINARY
# field name encoding
iceberg_schema = amber_schema_to_iceberg_schema(schema)

create_table(
IcebergCatalogInstance.get_instance(),
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
iceberg_schema,
override_if_exists=True,
)

return IcebergDocument[Tuple](
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
iceberg_schema,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
else:
raise ValueError(f"Resource type {resource_type} is not supported")
match resource_type:
case VFSResourceType.RESULT:
namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
case VFSResourceType.STATE:
namespace = "state"
case _:
raise ValueError(f"Resource type {resource_type} is not supported")

storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
# Convert Amber Schema to Iceberg Schema with LARGE_BINARY
# field name encoding
iceberg_schema = amber_schema_to_iceberg_schema(schema)

create_table(
IcebergCatalogInstance.get_instance(),
namespace,
storage_key,
iceberg_schema,
override_if_exists=True,
)

return IcebergDocument[Tuple](
namespace,
storage_key,
iceberg_schema,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)

else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document"
Expand All @@ -96,30 +101,36 @@ def open_document(uri: str) -> typing.Tuple[VirtualDocument, Optional[Schema]]:
if parsed_uri.scheme == "vfs":
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)

if resource_type in {VFSResourceType.RESULT}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

table = load_table_metadata(
IcebergCatalogInstance.get_instance(),
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
)

if table is None:
raise ValueError("No storage is found for the given URI")

amber_schema = Schema(table.schema().as_arrow())

document = IcebergDocument(
StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE,
storage_key,
table.schema(),
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
return document, amber_schema
else:
raise ValueError(f"Resource type {resource_type} is not supported")
match resource_type:
case VFSResourceType.RESULT:
namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
case VFSResourceType.STATE:
namespace = "state"
case _:
raise ValueError(f"Resource type {resource_type} is not supported")

storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)

table = load_table_metadata(
IcebergCatalogInstance.get_instance(),
namespace,
storage_key,
)

if table is None:
raise ValueError("No storage is found for the given URI")

amber_schema = Schema(table.schema().as_arrow())

document = IcebergDocument(
namespace,
storage_key,
table.schema(),
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
)
return document, amber_schema

else:
raise NotImplementedError(
f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document"
Expand Down
3 changes: 1 addition & 2 deletions amber/src/main/python/core/storage/iceberg/iceberg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def create_postgres_catalog(
catalog_name,
**{
"uri": f"postgresql+pg8000://{username}:{password}@{uri_without_scheme}",
"warehouse": f"file://{warehouse_path}",
"warehouse": warehouse_path,
},
)

Expand Down Expand Up @@ -180,7 +180,6 @@ def create_table(

if catalog.table_exists(identifier) and override_if_exists:
catalog.drop_table(identifier)

table = catalog.create_table(
identifier=identifier,
schema=table_schema,
Expand Down
Loading
Loading