diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index d714f64a154..74dd1ac6f75 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -46,6 +46,7 @@ message ControlRequest { PortCompletedRequest portCompletedRequest = 9; WorkerStateUpdatedRequest workerStateUpdatedRequest = 10; LinkWorkersRequest linkWorkersRequest = 11; + IterationCompletedRequest iterationCompletedRequest = 12; // request for worker AddInputChannelRequest addInputChannelRequest = 50; @@ -58,6 +59,7 @@ message ControlRequest { PrepareCheckpointRequest prepareCheckpointRequest = 57; QueryStatisticsRequest queryStatisticsRequest = 58; + // request for testing Ping ping = 100; Pong pong = 101; @@ -278,4 +280,8 @@ enum StatisticsUpdateTarget { message QueryStatisticsRequest{ repeated core.ActorVirtualIdentity filterByWorkers = 1; StatisticsUpdateTarget updateTarget = 2; +} + +message IterationCompletedRequest{ + core.OperatorIdentity LoopStartId = 1 [(scalapb.field).no_box = true]; } \ No newline at end of file diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto index 70d189a3411..2f6066c5713 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto @@ -42,6 +42,7 @@ service ControllerService { rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn); rpc WorkerStateUpdated(WorkerStateUpdatedRequest) returns (EmptyReturn); rpc WorkerExecutionCompleted(EmptyRequest) returns (EmptyReturn); + rpc IterationCompleted(IterationCompletedRequest) returns (EmptyReturn); rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn); rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn); rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn); diff --git a/amber/src/main/python/core/architecture/packaging/input_manager.py b/amber/src/main/python/core/architecture/packaging/input_manager.py index 6cb6bdc08c4..dfbf462e653 100644 --- a/amber/src/main/python/core/architecture/packaging/input_manager.py +++ b/amber/src/main/python/core/architecture/packaging/input_manager.py @@ -173,3 +173,6 @@ def _process_data(self, table: Table) -> Iterator[Tuple]: yield Tuple( {name: field_accessor for name in table.column_names}, schema=schema ) + + def get_input_state_uri(self): + return next(iter(self._input_port_mat_reader_runnables.values()))[0].uri.replace("/result", "/state") \ No newline at end of file diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index bf4afbf396f..d92e8dcc987 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -87,6 +87,8 @@ def __init__(self, worker_id: str): PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread] ] = dict() + self._storage_uris: typing.Dict[PortIdentity, str] = dict() + def is_missing_output_ports(self): """ This method is only used for ensuring correct region execution. @@ -126,6 +128,7 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri: str): Create a separate thread for saving output tuples of a port to storage in batch. """ + self._storage_uris[port_id] = storage_uri document, _ = DocumentFactory.open_document(storage_uri) buffered_item_writer = document.writer(str(get_worker_index(self.worker_id))) writer_queue = Queue() @@ -171,6 +174,21 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None: PortStorageWriterElement(data_tuple=tuple_) ) + def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None: + if port_id is None: + uris = self._storage_uris.values() + elif port_id in self._storage_uris: + uris = [self._storage_uris[port_id]] + else: + return + + for uri in uris: + writer = DocumentFactory.create_document( + uri.replace("/result", "/state"), state.schema + ).writer(str(get_worker_index(self.worker_id))) + writer.put_one(Tuple(vars(state))) + writer.close() + def close_port_storage_writers(self) -> None: """ Flush the buffers of port storage writers and wait for all the diff --git a/amber/src/main/python/core/models/operator.py b/amber/src/main/python/core/models/operator.py index 79050839958..bd6e94203d7 100644 --- a/amber/src/main/python/core/models/operator.py +++ b/amber/src/main/python/core/models/operator.py @@ -293,3 +293,42 @@ def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike] time, or None. """ yield + + +class LoopStartOperator(TableOperator): + def open(self) -> None: + pass + + @abstractmethod + def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + yield + + @overrides.final + def produce_state_on_finish(self, port: int) -> State: + from pickle import dumps + self.state["table"] = dumps(Table(self._TableOperator__table_data[port])) + return State().from_dict(self.state) + + def close(self) -> None: + pass + + +class LoopEndOperator(TableOperator): + def open(self) -> None: + pass + + @overrides.final + def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + yield table + + def close(self) -> None: + pass + + @abstractmethod + def condition(self) -> None: + pass + + def loop_start_id(self) -> str: + del self.state["table"] + del self.state["output"] + return self.state["LoopStartId"] diff --git a/amber/src/main/python/core/models/state.py b/amber/src/main/python/core/models/state.py index 2c8a268dfb7..2036065483a 100644 --- a/amber/src/main/python/core/models/state.py +++ b/amber/src/main/python/core/models/state.py @@ -27,7 +27,7 @@ @dataclass class State: def __init__( - self, table: Optional[Table] = None, pass_to_all_downstream: bool = False + self, table: Optional[Table] = None, pass_to_all_downstream: bool = True ): self.schema = Schema() self.passToAllDownstream = pass_to_all_downstream @@ -35,6 +35,20 @@ def __init__( self.__dict__.update(table.to_pandas().iloc[0].to_dict()) self.schema = Schema(table.schema) + @classmethod + def from_tuple(cls, tuple, schema): + obj = cls() + obj.__dict__.update(tuple.as_dict()) + obj.schema = schema + return obj + + @classmethod + def from_dict(cls, dictionary): + obj = cls() + for item in dictionary: + obj.add(item, dictionary[item]) + return obj + def add( self, key: str, value: any, value_type: Optional[AttributeType] = None ) -> None: @@ -53,6 +67,12 @@ def to_table(self) -> Table: schema=self.schema.as_arrow_schema(), ) + def to_dict(self) -> dict: + dictionary = self.__dict__ + del dictionary["passToAllDownstream"] + del dictionary["schema"] + return dictionary + def __setattr__(self, key: str, value: any) -> None: self.add(key, value) diff --git a/amber/src/main/python/core/runnables/data_processor.py b/amber/src/main/python/core/runnables/data_processor.py index 4399b1a3a2f..815e85a6446 100644 --- a/amber/src/main/python/core/runnables/data_processor.py +++ b/amber/src/main/python/core/runnables/data_processor.py @@ -100,6 +100,7 @@ def process_state(self, state: State) -> None: self._context.worker_id, self._context.console_message_manager.print_buf, ): + self._switch_context() self._set_output_state(executor.process_state(state, port_id)) except Exception as err: diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index d73c655734f..b8c12787951 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -18,6 +18,8 @@ import threading import time import typing +import uuid + from loguru import logger from overrides import overrides from pampy import match @@ -38,8 +40,10 @@ ECMElement, InternalQueueElement, ) +from core.models.operator import LoopEndOperator, LoopStartOperator from core.models.state import State from core.runnables.data_processor import DataProcessor +from core.storage.document_factory import DocumentFactory from core.util import StoppableQueueBlockingRunnable, get_one_of from core.util.console_message.timestamp import current_time_in_local_timezone from core.util.customized_queue.queue_base import QueueElement @@ -47,7 +51,7 @@ ActorVirtualIdentity, PortIdentity, ChannelIdentity, - EmbeddedControlMessageIdentity, + EmbeddedControlMessageIdentity, OperatorIdentity, ) from proto.org.apache.texera.amber.engine.architecture.rpc import ( ConsoleMessage, @@ -60,7 +64,7 @@ EmbeddedControlMessageType, EmbeddedControlMessage, AsyncRpcContext, - ControlRequest, + ControlRequest, IterationCompletedRequest, ) from proto.org.apache.texera.amber.engine.architecture.worker import ( WorkerState, @@ -94,12 +98,22 @@ def complete(self) -> None: """ # flush the buffered console prints self._check_and_report_console_messages(force_flush=True) - self.context.executor_manager.executor.close() + controller_interface = self._async_rpc_client.controller_stub() + executor = self.context.executor_manager.executor + if isinstance(executor, LoopEndOperator) and executor.condition(): + controller_interface.iteration_completed(IterationCompletedRequest(OperatorIdentity(executor.loop_start_id()))) + uri = executor.state["LoopStartStateURI"] + del executor.state["LoopStartStateURI"] + del executor.state["LoopStartId"] + state = State.from_dict(executor.state) + writer = DocumentFactory.create_document(uri, state.schema).writer(str(uuid.uuid4())) + writer.put_one(Tuple(vars(state))) + writer.close() + executor.close() # stop the data processing thread self.data_processor.stop() self.context.state_manager.transit_to(WorkerState.COMPLETED) self.context.statistics_manager.update_total_execution_time(time.time_ns()) - controller_interface = self._async_rpc_client.controller_stub() controller_interface.worker_execution_completed(EmptyRequest()) self.context.close() @@ -188,15 +202,19 @@ def process_input_state(self) -> None: output_state = self.context.state_processing_manager.get_output_state() self._switch_context() if output_state is not None: - for to, batch in self.context.output_manager.emit_state(output_state): - self._output_queue.put( - DataElement( - tag=ChannelIdentity( - ActorVirtualIdentity(self.context.worker_id), to, False - ), - payload=batch, + if isinstance(self.context.executor_manager.executor, LoopStartOperator): + output_state.add("LoopStartId", self.context.worker_id.split('-', 1)[1].rsplit('-main-0', 1)[0]) + output_state.add("LoopStartStateURI", self.context.input_manager.get_input_state_uri()) + for to, batch in self.context.output_manager.emit_state(output_state): + self._output_queue.put( + DataElement( + tag=ChannelIdentity( + ActorVirtualIdentity(self.context.worker_id), to, False + ), + payload=batch, + ) ) - ) + self.context.output_manager.save_state_to_storage_if_needed(output_state) def process_tuple_with_udf(self) -> Iterator[Optional[Tuple]]: """ @@ -329,7 +347,7 @@ def _process_ecm(self, ecm_element: ECMElement): if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT: self.context.pause_manager.resume(PauseType.ECM_PAUSE) - + self._switch_context() if self.context.tuple_processing_manager.current_internal_marker: { StartChannel: self._process_start_channel, diff --git a/amber/src/main/python/core/storage/document_factory.py b/amber/src/main/python/core/storage/document_factory.py index 9b686ab66b6..8a4d6fe3c5f 100644 --- a/amber/src/main/python/core/storage/document_factory.py +++ b/amber/src/main/python/core/storage/document_factory.py @@ -61,30 +61,35 @@ def create_document(uri: str, schema: Schema) -> VirtualDocument: if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME: _, _, _, resource_type = VFSURIFactory.decode_uri(uri) - if resource_type in {VFSResourceType.RESULT}: - storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) - - # Convert Amber Schema to Iceberg Schema with LARGE_BINARY - # field name encoding - iceberg_schema = amber_schema_to_iceberg_schema(schema) - - create_table( - IcebergCatalogInstance.get_instance(), - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - iceberg_schema, - override_if_exists=True, - ) - - return IcebergDocument[Tuple]( - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - iceberg_schema, - amber_tuples_to_arrow_table, - arrow_table_to_amber_tuples, - ) - else: - raise ValueError(f"Resource type {resource_type} is not supported") + match resource_type: + case VFSResourceType.RESULT: + namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE + case VFSResourceType.STATE: + namespace = "state" + case _: + raise ValueError(f"Resource type {resource_type} is not supported") + + storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) + # Convert Amber Schema to Iceberg Schema with LARGE_BINARY + # field name encoding + iceberg_schema = amber_schema_to_iceberg_schema(schema) + + create_table( + IcebergCatalogInstance.get_instance(), + namespace, + storage_key, + iceberg_schema, + override_if_exists=True, + ) + + return IcebergDocument[Tuple]( + namespace, + storage_key, + iceberg_schema, + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + ) + else: raise NotImplementedError( f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document" @@ -96,30 +101,36 @@ def open_document(uri: str) -> typing.Tuple[VirtualDocument, Optional[Schema]]: if parsed_uri.scheme == "vfs": _, _, _, resource_type = VFSURIFactory.decode_uri(uri) - if resource_type in {VFSResourceType.RESULT}: - storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) - - table = load_table_metadata( - IcebergCatalogInstance.get_instance(), - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - ) - - if table is None: - raise ValueError("No storage is found for the given URI") - - amber_schema = Schema(table.schema().as_arrow()) - - document = IcebergDocument( - StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE, - storage_key, - table.schema(), - amber_tuples_to_arrow_table, - arrow_table_to_amber_tuples, - ) - return document, amber_schema - else: - raise ValueError(f"Resource type {resource_type} is not supported") + match resource_type: + case VFSResourceType.RESULT: + namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE + case VFSResourceType.STATE: + namespace = "state" + case _: + raise ValueError(f"Resource type {resource_type} is not supported") + + storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) + + table = load_table_metadata( + IcebergCatalogInstance.get_instance(), + namespace, + storage_key, + ) + + if table is None: + raise ValueError("No storage is found for the given URI") + + amber_schema = Schema(table.schema().as_arrow()) + + document = IcebergDocument( + namespace, + storage_key, + table.schema(), + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + ) + return document, amber_schema + else: raise NotImplementedError( f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document" diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py index 9e17b2e0e82..cca785beb69 100644 --- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py +++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py @@ -148,7 +148,7 @@ def create_postgres_catalog( catalog_name, **{ "uri": f"postgresql+pg8000://{username}:{password}@{uri_without_scheme}", - "warehouse": f"file://{warehouse_path}", + "warehouse": warehouse_path, }, ) @@ -180,7 +180,6 @@ def create_table( if catalog.table_exists(identifier) and override_if_exists: catalog.drop_table(identifier) - table = catalog.create_table( identifier=identifier, schema=table_schema, diff --git a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py index e49c0316cc7..ad07be4e023 100644 --- a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py +++ b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py @@ -17,8 +17,8 @@ import typing from loguru import logger -from pyarrow import Table from typing import Union +from pyarrow import Table from core.architecture.sendsemantics.broad_cast_partitioner import ( BroadcastPartitioner, @@ -34,7 +34,7 @@ from core.architecture.sendsemantics.round_robin_partitioner import ( RoundRobinPartitioner, ) -from core.models import Tuple, InternalQueue, DataFrame, DataPayload +from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame from core.models.internal_queue import DataElement, ECMElement from core.storage.document_factory import DocumentFactory from core.util import Stoppable, get_one_of @@ -125,6 +125,15 @@ def tuple_to_batch_with_filter(self, tuple_: Tuple) -> typing.Iterator[DataFrame if receiver == self.worker_actor_id: yield self.tuples_to_data_frame(tuples) + def emit_state_with_filter(self, state: State) -> typing.Iterator[StateFrame]: + for receiver, payload in self.partitioner.flush_state(state): + if receiver == self.worker_actor_id: + yield ( + StateFrame(payload) + if isinstance(payload, State) + else self.tuples_to_data_frame(payload) + ) + def run(self) -> None: """ Main execution logic that reads tuples from the materialized storage and @@ -138,8 +147,21 @@ def run(self) -> None: self.uri ) self.emit_ecm("StartChannel", EmbeddedControlMessageType.NO_ALIGNMENT) - storage_iterator = self.materialization.get() + try: + state_document, state_schema = DocumentFactory.open_document( + self.uri.replace("/result", "/state") + ) + state_iterator = state_document.get() + for state in state_iterator: + for state_frame in self.emit_state_with_filter( + State.from_tuple(state, state_schema) + ): + self.emit_payload(state_frame) + except ValueError: + pass + + storage_iterator = self.materialization.get() # Iterate and process tuples. for tup in storage_iterator: if self._stopped: @@ -149,6 +171,8 @@ def run(self) -> None: tup.cast_to_schema(self.tuple_schema) for data_frame in self.tuple_to_batch_with_filter(tup): self.emit_payload(data_frame) + + self.emit_ecm("EndChannel", EmbeddedControlMessageType.PORT_ALIGNMENT) self._finished = True except Exception as err: diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py b/amber/src/main/python/core/storage/vfs_uri_factory.py index de0c5db56ec..0e23e607055 100644 --- a/amber/src/main/python/core/storage/vfs_uri_factory.py +++ b/amber/src/main/python/core/storage/vfs_uri_factory.py @@ -34,6 +34,7 @@ class VFSResourceType(str, Enum): RESULT = "result" RUNTIME_STATISTICS = "runtimeStatistics" CONSOLE_MESSAGES = "consoleMessages" + STATE = "state" class VFSURIFactory: diff --git a/amber/src/main/python/core/util/console_message/replace_print.py b/amber/src/main/python/core/util/console_message/replace_print.py index 7feeeeb52d0..0df200fac4b 100644 --- a/amber/src/main/python/core/util/console_message/replace_print.py +++ b/amber/src/main/python/core/util/console_message/replace_print.py @@ -58,30 +58,7 @@ def __enter__(self) -> None: :return: """ - def wrapped_print(*args, **kwargs): - # use StringIO to obtain the written complete string from the original - # print function. - if "file" in kwargs: - self.builtins_print(*args, **kwargs) - return - with StringIO() as tmp_buf, redirect_stdout(tmp_buf): - self.builtins_print(*args, **kwargs) - complete_str = tmp_buf.getvalue() - console_message = ConsoleMessage( - worker_id=self.worker_id, - timestamp=current_time_in_local_timezone(), - msg_type=ConsoleMessageType.PRINT, - source=( - f"{inspect.currentframe().f_back.f_globals['__name__']}" - f":{inspect.currentframe().f_back.f_code.co_name}" - f":{inspect.currentframe().f_back.f_lineno}" - ), - title=complete_str, - message="", - ) - self.buf.put(console_message) - - builtins.print = wrapped_print + pass def __exit__(self, exc_type, exc_val, exc_tb) -> bool: """ diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py index ea6ddc5e43f..ec19b581a66 100644 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py +++ b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py @@ -43,6 +43,12 @@ class ConsoleMessageType(betterproto.Enum): DEBUGGER = 3 +class StatisticsUpdateTarget(betterproto.Enum): + BOTH_UI_AND_PERSISTENCE = 0 + UI_ONLY = 1 + PERSISTENCE_ONLY = 2 + + class ErrorLanguage(betterproto.Enum): PYTHON = 0 SCALA = 1 @@ -96,6 +102,9 @@ class ControlRequest(betterproto.Message): link_workers_request: "LinkWorkersRequest" = betterproto.message_field( 11, group="sealed_value" ) + iteration_completed_request: "IterationCompletedRequest" = ( + betterproto.message_field(12, group="sealed_value") + ) add_input_channel_request: "AddInputChannelRequest" = betterproto.message_field( 50, group="sealed_value" ) @@ -385,6 +394,12 @@ class QueryStatisticsRequest(betterproto.Message): filter_by_workers: List["___core__.ActorVirtualIdentity"] = ( betterproto.message_field(1) ) + update_target: "StatisticsUpdateTarget" = betterproto.enum_field(2) + + +@dataclass(eq=False, repr=False) +class IterationCompletedRequest(betterproto.Message): + loop_start_id: "___core__.OperatorIdentity" = betterproto.message_field(1) @dataclass(eq=False, repr=False) @@ -517,503 +532,522 @@ class WorkerMetricsResponse(betterproto.Message): metrics: "_worker__.WorkerMetrics" = betterproto.message_field(1) -class RpcTesterStub(betterproto.ServiceStub): - async def send_ping( +class ControllerServiceStub(betterproto.ServiceStub): + async def retrieve_workflow_state( self, - ping: "Ping", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "IntResponse": + ) -> "RetrieveWorkflowStateResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing", - ping, - IntResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState", + empty_request, + RetrieveWorkflowStateResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_pong( + async def propagate_embedded_control_message( self, - pong: "Pong", + propagate_embedded_control_message_request: "PropagateEmbeddedControlMessageRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "IntResponse": + ) -> "PropagateEmbeddedControlMessageResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong", - pong, - IntResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage", + propagate_embedded_control_message_request, + PropagateEmbeddedControlMessageResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_nested( + async def take_global_checkpoint( self, - nested: "Nested", + take_global_checkpoint_request: "TakeGlobalCheckpointRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "TakeGlobalCheckpointResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested", - nested, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint", + take_global_checkpoint_request, + TakeGlobalCheckpointResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_pass( + async def debug_command( self, - pass_: "Pass", + debug_command_request: "DebugCommandRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass", - pass_, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand", + debug_command_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_error_command( + async def evaluate_python_expression( self, - error_command: "ErrorCommand", + evaluate_python_expression_request: "EvaluatePythonExpressionRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EvaluatePythonExpressionResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand", - error_command, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression", + evaluate_python_expression_request, + EvaluatePythonExpressionResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_recursion( + async def console_message_triggered( self, - recursion: "Recursion", + console_message_triggered_request: "ConsoleMessageTriggeredRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion", - recursion, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered", + console_message_triggered_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_collect( + async def port_completed( self, - collect: "Collect", + port_completed_request: "PortCompletedRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect", - collect, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted", + port_completed_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_generate_number( + async def start_workflow( self, - generate_number: "GenerateNumber", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "IntResponse": + ) -> "StartWorkflowResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber", - generate_number, - IntResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow", + empty_request, + StartWorkflowResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_multi_call( + async def resume_workflow( self, - multi_call: "MultiCall", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall", - multi_call, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow", + empty_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_chain( + async def pause_workflow( self, - chain: "Chain", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain", - chain, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow", + empty_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - -class WorkerServiceStub(betterproto.ServiceStub): - async def add_input_channel( + async def worker_state_updated( self, - add_input_channel_request: "AddInputChannelRequest", + worker_state_updated_request: "WorkerStateUpdatedRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel", - add_input_channel_request, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated", + worker_state_updated_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def add_partitioning( + async def worker_execution_completed( self, - add_partitioning_request: "AddPartitioningRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning", - add_partitioning_request, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted", + empty_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def assign_port( + async def iteration_completed( self, - assign_port_request: "AssignPortRequest", + iteration_completed_request: "IterationCompletedRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort", - assign_port_request, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/IterationCompleted", + iteration_completed_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def finalize_checkpoint( + async def link_workers( self, - finalize_checkpoint_request: "FinalizeCheckpointRequest", + link_workers_request: "LinkWorkersRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "FinalizeCheckpointResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint", - finalize_checkpoint_request, - FinalizeCheckpointResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers", + link_workers_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def flush_network_buffer( + async def controller_initiate_query_statistics( self, - empty_request: "EmptyRequest", + query_statistics_request: "QueryStatisticsRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer", - empty_request, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics", + query_statistics_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def initialize_executor( + async def retry_workflow( self, - initialize_executor_request: "InitializeExecutorRequest", + retry_workflow_request: "RetryWorkflowRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor", - initialize_executor_request, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow", + retry_workflow_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def open_executor( + +class RpcTesterStub(betterproto.ServiceStub): + async def send_ping( self, - empty_request: "EmptyRequest", + ping: "Ping", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "IntResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor", - empty_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing", + ping, + IntResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def pause_worker( + async def send_pong( self, - empty_request: "EmptyRequest", + pong: "Pong", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "WorkerStateResponse": + ) -> "IntResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker", - empty_request, - WorkerStateResponse, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong", + pong, + IntResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def prepare_checkpoint( + async def send_nested( self, - prepare_checkpoint_request: "PrepareCheckpointRequest", + nested: "Nested", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint", - prepare_checkpoint_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested", + nested, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def query_statistics( + async def send_pass( self, - empty_request: "EmptyRequest", + pass_: "Pass", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "WorkerMetricsResponse": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics", - empty_request, - WorkerMetricsResponse, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass", + pass_, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def resume_worker( + async def send_error_command( self, - empty_request: "EmptyRequest", + error_command: "ErrorCommand", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "WorkerStateResponse": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker", - empty_request, - WorkerStateResponse, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand", + error_command, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def retrieve_state( + async def send_recursion( self, - empty_request: "EmptyRequest", + recursion: "Recursion", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState", - empty_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion", + recursion, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def retry_current_tuple( + async def send_collect( self, - empty_request: "EmptyRequest", + collect: "Collect", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple", - empty_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect", + collect, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def start_worker( + async def send_generate_number( self, - empty_request: "EmptyRequest", + generate_number: "GenerateNumber", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "WorkerStateResponse": + ) -> "IntResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker", - empty_request, - WorkerStateResponse, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber", + generate_number, + IntResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def end_worker( + async def send_multi_call( self, - empty_request: "EmptyRequest", + multi_call: "MultiCall", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker", - empty_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall", + multi_call, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def start_channel( + async def send_chain( self, - empty_request: "EmptyRequest", + chain: "Chain", + *, + timeout: Optional[float] = None, + deadline: Optional["Deadline"] = None, + metadata: Optional["MetadataLike"] = None + ) -> "StringResponse": + return await self._unary_unary( + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain", + chain, + StringResponse, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + + +class WorkerServiceStub(betterproto.ServiceStub): + async def add_input_channel( + self, + add_input_channel_request: "AddInputChannelRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel", - empty_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel", + add_input_channel_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def end_channel( + async def add_partitioning( self, - empty_request: "EmptyRequest", + add_partitioning_request: "AddPartitioningRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel", - empty_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning", + add_partitioning_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def debug_command( + async def assign_port( self, - debug_command_request: "DebugCommandRequest", + assign_port_request: "AssignPortRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand", - debug_command_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort", + assign_port_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def evaluate_python_expression( + async def finalize_checkpoint( self, - evaluate_python_expression_request: "EvaluatePythonExpressionRequest", + finalize_checkpoint_request: "FinalizeCheckpointRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EvaluatedValue": + ) -> "FinalizeCheckpointResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression", - evaluate_python_expression_request, - EvaluatedValue, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint", + finalize_checkpoint_request, + FinalizeCheckpointResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def no_operation( + async def flush_network_buffer( self, empty_request: "EmptyRequest", *, @@ -1022,7 +1056,7 @@ async def no_operation( metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation", + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer", empty_request, EmptyReturn, timeout=timeout, @@ -1030,162 +1064,160 @@ async def no_operation( metadata=metadata, ) - -class ControllerServiceStub(betterproto.ServiceStub): - async def retrieve_workflow_state( + async def initialize_executor( self, - empty_request: "EmptyRequest", + initialize_executor_request: "InitializeExecutorRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "RetrieveWorkflowStateResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState", - empty_request, - RetrieveWorkflowStateResponse, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor", + initialize_executor_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def propagate_embedded_control_message( + async def open_executor( self, - propagate_embedded_control_message_request: "PropagateEmbeddedControlMessageRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "PropagateEmbeddedControlMessageResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage", - propagate_embedded_control_message_request, - PropagateEmbeddedControlMessageResponse, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor", + empty_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def take_global_checkpoint( + async def pause_worker( self, - take_global_checkpoint_request: "TakeGlobalCheckpointRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "TakeGlobalCheckpointResponse": + ) -> "WorkerStateResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint", - take_global_checkpoint_request, - TakeGlobalCheckpointResponse, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker", + empty_request, + WorkerStateResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def debug_command( + async def prepare_checkpoint( self, - debug_command_request: "DebugCommandRequest", + prepare_checkpoint_request: "PrepareCheckpointRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand", - debug_command_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint", + prepare_checkpoint_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def evaluate_python_expression( + async def query_statistics( self, - evaluate_python_expression_request: "EvaluatePythonExpressionRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EvaluatePythonExpressionResponse": + ) -> "WorkerMetricsResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression", - evaluate_python_expression_request, - EvaluatePythonExpressionResponse, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics", + empty_request, + WorkerMetricsResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def console_message_triggered( + async def resume_worker( self, - console_message_triggered_request: "ConsoleMessageTriggeredRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "WorkerStateResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered", - console_message_triggered_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker", + empty_request, + WorkerStateResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def port_completed( + async def retrieve_state( self, - port_completed_request: "PortCompletedRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted", - port_completed_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState", + empty_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def start_workflow( + async def retry_current_tuple( self, empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StartWorkflowResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow", + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple", empty_request, - StartWorkflowResponse, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def resume_workflow( + async def start_worker( self, empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "WorkerStateResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow", + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker", empty_request, - EmptyReturn, + WorkerStateResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def pause_workflow( + async def end_worker( self, empty_request: "EmptyRequest", *, @@ -1194,7 +1226,7 @@ async def pause_workflow( metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow", + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker", empty_request, EmptyReturn, timeout=timeout, @@ -1202,24 +1234,24 @@ async def pause_workflow( metadata=metadata, ) - async def worker_state_updated( + async def start_channel( self, - worker_state_updated_request: "WorkerStateUpdatedRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated", - worker_state_updated_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel", + empty_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def worker_execution_completed( + async def end_channel( self, empty_request: "EmptyRequest", *, @@ -1228,7 +1260,7 @@ async def worker_execution_completed( metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted", + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel", empty_request, EmptyReturn, timeout=timeout, @@ -1236,51 +1268,51 @@ async def worker_execution_completed( metadata=metadata, ) - async def link_workers( + async def debug_command( self, - link_workers_request: "LinkWorkersRequest", + debug_command_request: "DebugCommandRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers", - link_workers_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand", + debug_command_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def controller_initiate_query_statistics( + async def evaluate_python_expression( self, - query_statistics_request: "QueryStatisticsRequest", + evaluate_python_expression_request: "EvaluatePythonExpressionRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "EvaluatedValue": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics", - query_statistics_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression", + evaluate_python_expression_request, + EvaluatedValue, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def retry_workflow( + async def no_operation( self, - retry_workflow_request: "RetryWorkflowRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow", - retry_workflow_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation", + empty_request, EmptyReturn, timeout=timeout, deadline=deadline, @@ -1288,806 +1320,824 @@ async def retry_workflow( ) -class RpcTesterBase(ServiceBase): +class ControllerServiceBase(ServiceBase): - async def send_ping(self, ping: "Ping") -> "IntResponse": + async def retrieve_workflow_state( + self, empty_request: "EmptyRequest" + ) -> "RetrieveWorkflowStateResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_pong(self, pong: "Pong") -> "IntResponse": + async def propagate_embedded_control_message( + self, + propagate_embedded_control_message_request: "PropagateEmbeddedControlMessageRequest", + ) -> "PropagateEmbeddedControlMessageResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_nested(self, nested: "Nested") -> "StringResponse": + async def take_global_checkpoint( + self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest" + ) -> "TakeGlobalCheckpointResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_pass(self, pass_: "Pass") -> "StringResponse": + async def debug_command( + self, debug_command_request: "DebugCommandRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_error_command( - self, error_command: "ErrorCommand" - ) -> "StringResponse": + async def evaluate_python_expression( + self, evaluate_python_expression_request: "EvaluatePythonExpressionRequest" + ) -> "EvaluatePythonExpressionResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_recursion(self, recursion: "Recursion") -> "StringResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def console_message_triggered( + self, console_message_triggered_request: "ConsoleMessageTriggeredRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_collect(self, collect: "Collect") -> "StringResponse": + async def port_completed( + self, port_completed_request: "PortCompletedRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_generate_number( - self, generate_number: "GenerateNumber" - ) -> "IntResponse": + async def start_workflow( + self, empty_request: "EmptyRequest" + ) -> "StartWorkflowResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_multi_call(self, multi_call: "MultiCall") -> "StringResponse": + async def resume_workflow(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_chain(self, chain: "Chain") -> "StringResponse": + async def pause_workflow(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def __rpc_send_ping( - self, stream: "grpclib.server.Stream[Ping, IntResponse]" + async def worker_state_updated( + self, worker_state_updated_request: "WorkerStateUpdatedRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def worker_execution_completed( + self, empty_request: "EmptyRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def iteration_completed( + self, iteration_completed_request: "IterationCompletedRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def link_workers( + self, link_workers_request: "LinkWorkersRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def controller_initiate_query_statistics( + self, query_statistics_request: "QueryStatisticsRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def retry_workflow( + self, retry_workflow_request: "RetryWorkflowRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def __rpc_retrieve_workflow_state( + self, + stream: "grpclib.server.Stream[EmptyRequest, RetrieveWorkflowStateResponse]", ) -> None: request = await stream.recv_message() - response = await self.send_ping(request) + response = await self.retrieve_workflow_state(request) await stream.send_message(response) - async def __rpc_send_pong( - self, stream: "grpclib.server.Stream[Pong, IntResponse]" + async def __rpc_propagate_embedded_control_message( + self, + stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest, PropagateEmbeddedControlMessageResponse]", ) -> None: request = await stream.recv_message() - response = await self.send_pong(request) + response = await self.propagate_embedded_control_message(request) await stream.send_message(response) - async def __rpc_send_nested( - self, stream: "grpclib.server.Stream[Nested, StringResponse]" + async def __rpc_take_global_checkpoint( + self, + stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest, TakeGlobalCheckpointResponse]", ) -> None: request = await stream.recv_message() - response = await self.send_nested(request) + response = await self.take_global_checkpoint(request) await stream.send_message(response) - async def __rpc_send_pass( - self, stream: "grpclib.server.Stream[Pass, StringResponse]" + async def __rpc_debug_command( + self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.send_pass(request) + response = await self.debug_command(request) await stream.send_message(response) - async def __rpc_send_error_command( - self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]" + async def __rpc_evaluate_python_expression( + self, + stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, EvaluatePythonExpressionResponse]", ) -> None: request = await stream.recv_message() - response = await self.send_error_command(request) + response = await self.evaluate_python_expression(request) await stream.send_message(response) - async def __rpc_send_recursion( - self, stream: "grpclib.server.Stream[Recursion, StringResponse]" + async def __rpc_console_message_triggered( + self, + stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest, EmptyReturn]", ) -> None: request = await stream.recv_message() - response = await self.send_recursion(request) + response = await self.console_message_triggered(request) await stream.send_message(response) - async def __rpc_send_collect( - self, stream: "grpclib.server.Stream[Collect, StringResponse]" + async def __rpc_port_completed( + self, stream: "grpclib.server.Stream[PortCompletedRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.send_collect(request) + response = await self.port_completed(request) await stream.send_message(response) - async def __rpc_send_generate_number( - self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]" + async def __rpc_start_workflow( + self, stream: "grpclib.server.Stream[EmptyRequest, StartWorkflowResponse]" ) -> None: request = await stream.recv_message() - response = await self.send_generate_number(request) + response = await self.start_workflow(request) await stream.send_message(response) - async def __rpc_send_multi_call( - self, stream: "grpclib.server.Stream[MultiCall, StringResponse]" + async def __rpc_resume_workflow( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.send_multi_call(request) + response = await self.resume_workflow(request) await stream.send_message(response) - async def __rpc_send_chain( - self, stream: "grpclib.server.Stream[Chain, StringResponse]" + async def __rpc_pause_workflow( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.send_chain(request) + response = await self.pause_workflow(request) + await stream.send_message(response) + + async def __rpc_worker_state_updated( + self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.worker_state_updated(request) + await stream.send_message(response) + + async def __rpc_worker_execution_completed( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.worker_execution_completed(request) + await stream.send_message(response) + + async def __rpc_iteration_completed( + self, stream: "grpclib.server.Stream[IterationCompletedRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.iteration_completed(request) + await stream.send_message(response) + + async def __rpc_link_workers( + self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.link_workers(request) + await stream.send_message(response) + + async def __rpc_controller_initiate_query_statistics( + self, stream: "grpclib.server.Stream[QueryStatisticsRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.controller_initiate_query_statistics(request) + await stream.send_message(response) + + async def __rpc_retry_workflow( + self, stream: "grpclib.server.Stream[RetryWorkflowRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.retry_workflow(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing": grpclib.const.Handler( - self.__rpc_send_ping, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState": grpclib.const.Handler( + self.__rpc_retrieve_workflow_state, grpclib.const.Cardinality.UNARY_UNARY, - Ping, - IntResponse, + EmptyRequest, + RetrieveWorkflowStateResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong": grpclib.const.Handler( - self.__rpc_send_pong, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage": grpclib.const.Handler( + self.__rpc_propagate_embedded_control_message, grpclib.const.Cardinality.UNARY_UNARY, - Pong, - IntResponse, + PropagateEmbeddedControlMessageRequest, + PropagateEmbeddedControlMessageResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested": grpclib.const.Handler( - self.__rpc_send_nested, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint": grpclib.const.Handler( + self.__rpc_take_global_checkpoint, grpclib.const.Cardinality.UNARY_UNARY, - Nested, - StringResponse, + TakeGlobalCheckpointRequest, + TakeGlobalCheckpointResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass": grpclib.const.Handler( - self.__rpc_send_pass, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand": grpclib.const.Handler( + self.__rpc_debug_command, grpclib.const.Cardinality.UNARY_UNARY, - Pass, - StringResponse, + DebugCommandRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand": grpclib.const.Handler( - self.__rpc_send_error_command, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression": grpclib.const.Handler( + self.__rpc_evaluate_python_expression, grpclib.const.Cardinality.UNARY_UNARY, - ErrorCommand, - StringResponse, + EvaluatePythonExpressionRequest, + EvaluatePythonExpressionResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion": grpclib.const.Handler( - self.__rpc_send_recursion, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered": grpclib.const.Handler( + self.__rpc_console_message_triggered, grpclib.const.Cardinality.UNARY_UNARY, - Recursion, - StringResponse, + ConsoleMessageTriggeredRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect": grpclib.const.Handler( - self.__rpc_send_collect, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted": grpclib.const.Handler( + self.__rpc_port_completed, grpclib.const.Cardinality.UNARY_UNARY, - Collect, - StringResponse, + PortCompletedRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber": grpclib.const.Handler( - self.__rpc_send_generate_number, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow": grpclib.const.Handler( + self.__rpc_start_workflow, grpclib.const.Cardinality.UNARY_UNARY, - GenerateNumber, - IntResponse, + EmptyRequest, + StartWorkflowResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall": grpclib.const.Handler( - self.__rpc_send_multi_call, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow": grpclib.const.Handler( + self.__rpc_resume_workflow, grpclib.const.Cardinality.UNARY_UNARY, - MultiCall, - StringResponse, + EmptyRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain": grpclib.const.Handler( - self.__rpc_send_chain, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow": grpclib.const.Handler( + self.__rpc_pause_workflow, grpclib.const.Cardinality.UNARY_UNARY, - Chain, - StringResponse, + EmptyRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated": grpclib.const.Handler( + self.__rpc_worker_state_updated, + grpclib.const.Cardinality.UNARY_UNARY, + WorkerStateUpdatedRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted": grpclib.const.Handler( + self.__rpc_worker_execution_completed, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/IterationCompleted": grpclib.const.Handler( + self.__rpc_iteration_completed, + grpclib.const.Cardinality.UNARY_UNARY, + IterationCompletedRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers": grpclib.const.Handler( + self.__rpc_link_workers, + grpclib.const.Cardinality.UNARY_UNARY, + LinkWorkersRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics": grpclib.const.Handler( + self.__rpc_controller_initiate_query_statistics, + grpclib.const.Cardinality.UNARY_UNARY, + QueryStatisticsRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow": grpclib.const.Handler( + self.__rpc_retry_workflow, + grpclib.const.Cardinality.UNARY_UNARY, + RetryWorkflowRequest, + EmptyReturn, ), } -class WorkerServiceBase(ServiceBase): - - async def add_input_channel( - self, add_input_channel_request: "AddInputChannelRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def add_partitioning( - self, add_partitioning_request: "AddPartitioningRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def assign_port( - self, assign_port_request: "AssignPortRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def finalize_checkpoint( - self, finalize_checkpoint_request: "FinalizeCheckpointRequest" - ) -> "FinalizeCheckpointResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def flush_network_buffer( - self, empty_request: "EmptyRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def initialize_executor( - self, initialize_executor_request: "InitializeExecutorRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def open_executor(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def pause_worker( - self, empty_request: "EmptyRequest" - ) -> "WorkerStateResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def prepare_checkpoint( - self, prepare_checkpoint_request: "PrepareCheckpointRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def query_statistics( - self, empty_request: "EmptyRequest" - ) -> "WorkerMetricsResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) +class RpcTesterBase(ServiceBase): - async def resume_worker( - self, empty_request: "EmptyRequest" - ) -> "WorkerStateResponse": + async def send_ping(self, ping: "Ping") -> "IntResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def retrieve_state(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_pong(self, pong: "Pong") -> "IntResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def retry_current_tuple(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_nested(self, nested: "Nested") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def start_worker( - self, empty_request: "EmptyRequest" - ) -> "WorkerStateResponse": + async def send_pass(self, pass_: "Pass") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_error_command( + self, error_command: "ErrorCommand" + ) -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def start_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_recursion(self, recursion: "Recursion") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def end_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_collect(self, collect: "Collect") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def debug_command( - self, debug_command_request: "DebugCommandRequest" - ) -> "EmptyReturn": + async def send_generate_number( + self, generate_number: "GenerateNumber" + ) -> "IntResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def evaluate_python_expression( - self, evaluate_python_expression_request: "EvaluatePythonExpressionRequest" - ) -> "EvaluatedValue": + async def send_multi_call(self, multi_call: "MultiCall") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def no_operation(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_chain(self, chain: "Chain") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def __rpc_add_input_channel( - self, stream: "grpclib.server.Stream[AddInputChannelRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.add_input_channel(request) - await stream.send_message(response) - - async def __rpc_add_partitioning( - self, stream: "grpclib.server.Stream[AddPartitioningRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.add_partitioning(request) - await stream.send_message(response) - - async def __rpc_assign_port( - self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.assign_port(request) - await stream.send_message(response) - - async def __rpc_finalize_checkpoint( - self, - stream: "grpclib.server.Stream[FinalizeCheckpointRequest, FinalizeCheckpointResponse]", - ) -> None: - request = await stream.recv_message() - response = await self.finalize_checkpoint(request) - await stream.send_message(response) - - async def __rpc_flush_network_buffer( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.flush_network_buffer(request) - await stream.send_message(response) - - async def __rpc_initialize_executor( - self, stream: "grpclib.server.Stream[InitializeExecutorRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.initialize_executor(request) - await stream.send_message(response) - - async def __rpc_open_executor( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.open_executor(request) - await stream.send_message(response) - - async def __rpc_pause_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.pause_worker(request) - await stream.send_message(response) - - async def __rpc_prepare_checkpoint( - self, stream: "grpclib.server.Stream[PrepareCheckpointRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.prepare_checkpoint(request) - await stream.send_message(response) - - async def __rpc_query_statistics( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerMetricsResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.query_statistics(request) - await stream.send_message(response) - - async def __rpc_resume_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" + async def __rpc_send_ping( + self, stream: "grpclib.server.Stream[Ping, IntResponse]" ) -> None: request = await stream.recv_message() - response = await self.resume_worker(request) + response = await self.send_ping(request) await stream.send_message(response) - async def __rpc_retrieve_state( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_pong( + self, stream: "grpclib.server.Stream[Pong, IntResponse]" ) -> None: request = await stream.recv_message() - response = await self.retrieve_state(request) + response = await self.send_pong(request) await stream.send_message(response) - async def __rpc_retry_current_tuple( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_nested( + self, stream: "grpclib.server.Stream[Nested, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.retry_current_tuple(request) + response = await self.send_nested(request) await stream.send_message(response) - async def __rpc_start_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" + async def __rpc_send_pass( + self, stream: "grpclib.server.Stream[Pass, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.start_worker(request) + response = await self.send_pass(request) await stream.send_message(response) - - async def __rpc_end_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + + async def __rpc_send_error_command( + self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.end_worker(request) + response = await self.send_error_command(request) await stream.send_message(response) - async def __rpc_start_channel( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_recursion( + self, stream: "grpclib.server.Stream[Recursion, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.start_channel(request) + response = await self.send_recursion(request) await stream.send_message(response) - async def __rpc_end_channel( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_collect( + self, stream: "grpclib.server.Stream[Collect, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.end_channel(request) + response = await self.send_collect(request) await stream.send_message(response) - async def __rpc_debug_command( - self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" + async def __rpc_send_generate_number( + self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]" ) -> None: request = await stream.recv_message() - response = await self.debug_command(request) + response = await self.send_generate_number(request) await stream.send_message(response) - async def __rpc_evaluate_python_expression( - self, - stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, EvaluatedValue]", + async def __rpc_send_multi_call( + self, stream: "grpclib.server.Stream[MultiCall, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.evaluate_python_expression(request) + response = await self.send_multi_call(request) await stream.send_message(response) - async def __rpc_no_operation( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_chain( + self, stream: "grpclib.server.Stream[Chain, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.no_operation(request) + response = await self.send_chain(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel": grpclib.const.Handler( - self.__rpc_add_input_channel, - grpclib.const.Cardinality.UNARY_UNARY, - AddInputChannelRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning": grpclib.const.Handler( - self.__rpc_add_partitioning, - grpclib.const.Cardinality.UNARY_UNARY, - AddPartitioningRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort": grpclib.const.Handler( - self.__rpc_assign_port, - grpclib.const.Cardinality.UNARY_UNARY, - AssignPortRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint": grpclib.const.Handler( - self.__rpc_finalize_checkpoint, - grpclib.const.Cardinality.UNARY_UNARY, - FinalizeCheckpointRequest, - FinalizeCheckpointResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer": grpclib.const.Handler( - self.__rpc_flush_network_buffer, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor": grpclib.const.Handler( - self.__rpc_initialize_executor, - grpclib.const.Cardinality.UNARY_UNARY, - InitializeExecutorRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor": grpclib.const.Handler( - self.__rpc_open_executor, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker": grpclib.const.Handler( - self.__rpc_pause_worker, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerStateResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint": grpclib.const.Handler( - self.__rpc_prepare_checkpoint, - grpclib.const.Cardinality.UNARY_UNARY, - PrepareCheckpointRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics": grpclib.const.Handler( - self.__rpc_query_statistics, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerMetricsResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker": grpclib.const.Handler( - self.__rpc_resume_worker, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing": grpclib.const.Handler( + self.__rpc_send_ping, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerStateResponse, + Ping, + IntResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState": grpclib.const.Handler( - self.__rpc_retrieve_state, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong": grpclib.const.Handler( + self.__rpc_send_pong, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + Pong, + IntResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple": grpclib.const.Handler( - self.__rpc_retry_current_tuple, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested": grpclib.const.Handler( + self.__rpc_send_nested, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + Nested, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker": grpclib.const.Handler( - self.__rpc_start_worker, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass": grpclib.const.Handler( + self.__rpc_send_pass, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerStateResponse, + Pass, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker": grpclib.const.Handler( - self.__rpc_end_worker, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand": grpclib.const.Handler( + self.__rpc_send_error_command, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + ErrorCommand, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel": grpclib.const.Handler( - self.__rpc_start_channel, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion": grpclib.const.Handler( + self.__rpc_send_recursion, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + Recursion, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel": grpclib.const.Handler( - self.__rpc_end_channel, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect": grpclib.const.Handler( + self.__rpc_send_collect, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + Collect, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand": grpclib.const.Handler( - self.__rpc_debug_command, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber": grpclib.const.Handler( + self.__rpc_send_generate_number, grpclib.const.Cardinality.UNARY_UNARY, - DebugCommandRequest, - EmptyReturn, + GenerateNumber, + IntResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression": grpclib.const.Handler( - self.__rpc_evaluate_python_expression, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall": grpclib.const.Handler( + self.__rpc_send_multi_call, grpclib.const.Cardinality.UNARY_UNARY, - EvaluatePythonExpressionRequest, - EvaluatedValue, + MultiCall, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation": grpclib.const.Handler( - self.__rpc_no_operation, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain": grpclib.const.Handler( + self.__rpc_send_chain, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + Chain, + StringResponse, ), } -class ControllerServiceBase(ServiceBase): +class WorkerServiceBase(ServiceBase): - async def retrieve_workflow_state( - self, empty_request: "EmptyRequest" - ) -> "RetrieveWorkflowStateResponse": + async def add_input_channel( + self, add_input_channel_request: "AddInputChannelRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def propagate_embedded_control_message( - self, - propagate_embedded_control_message_request: "PropagateEmbeddedControlMessageRequest", - ) -> "PropagateEmbeddedControlMessageResponse": + async def add_partitioning( + self, add_partitioning_request: "AddPartitioningRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def take_global_checkpoint( - self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest" - ) -> "TakeGlobalCheckpointResponse": + async def assign_port( + self, assign_port_request: "AssignPortRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def debug_command( - self, debug_command_request: "DebugCommandRequest" - ) -> "EmptyReturn": + async def finalize_checkpoint( + self, finalize_checkpoint_request: "FinalizeCheckpointRequest" + ) -> "FinalizeCheckpointResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def evaluate_python_expression( - self, evaluate_python_expression_request: "EvaluatePythonExpressionRequest" - ) -> "EvaluatePythonExpressionResponse": + async def flush_network_buffer( + self, empty_request: "EmptyRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def console_message_triggered( - self, console_message_triggered_request: "ConsoleMessageTriggeredRequest" + async def initialize_executor( + self, initialize_executor_request: "InitializeExecutorRequest" ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def port_completed( - self, port_completed_request: "PortCompletedRequest" + async def open_executor(self, empty_request: "EmptyRequest") -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def pause_worker( + self, empty_request: "EmptyRequest" + ) -> "WorkerStateResponse": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def prepare_checkpoint( + self, prepare_checkpoint_request: "PrepareCheckpointRequest" ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def start_workflow( + async def query_statistics( self, empty_request: "EmptyRequest" - ) -> "StartWorkflowResponse": + ) -> "WorkerMetricsResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def resume_workflow(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def resume_worker( + self, empty_request: "EmptyRequest" + ) -> "WorkerStateResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def pause_workflow(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def retrieve_state(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def worker_state_updated( - self, worker_state_updated_request: "WorkerStateUpdatedRequest" - ) -> "EmptyReturn": + async def retry_current_tuple(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def worker_execution_completed( + async def start_worker( self, empty_request: "EmptyRequest" - ) -> "EmptyReturn": + ) -> "WorkerStateResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def link_workers( - self, link_workers_request: "LinkWorkersRequest" + async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def start_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def end_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def debug_command( + self, debug_command_request: "DebugCommandRequest" ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def controller_initiate_query_statistics( - self, query_statistics_request: "QueryStatisticsRequest" - ) -> "EmptyReturn": + async def evaluate_python_expression( + self, evaluate_python_expression_request: "EvaluatePythonExpressionRequest" + ) -> "EvaluatedValue": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def retry_workflow( - self, retry_workflow_request: "RetryWorkflowRequest" - ) -> "EmptyReturn": + async def no_operation(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def __rpc_retrieve_workflow_state( - self, - stream: "grpclib.server.Stream[EmptyRequest, RetrieveWorkflowStateResponse]", + async def __rpc_add_input_channel( + self, stream: "grpclib.server.Stream[AddInputChannelRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.retrieve_workflow_state(request) + response = await self.add_input_channel(request) await stream.send_message(response) - async def __rpc_propagate_embedded_control_message( - self, - stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest, PropagateEmbeddedControlMessageResponse]", + async def __rpc_add_partitioning( + self, stream: "grpclib.server.Stream[AddPartitioningRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.propagate_embedded_control_message(request) + response = await self.add_partitioning(request) await stream.send_message(response) - async def __rpc_take_global_checkpoint( + async def __rpc_assign_port( + self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.assign_port(request) + await stream.send_message(response) + + async def __rpc_finalize_checkpoint( self, - stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest, TakeGlobalCheckpointResponse]", + stream: "grpclib.server.Stream[FinalizeCheckpointRequest, FinalizeCheckpointResponse]", ) -> None: request = await stream.recv_message() - response = await self.take_global_checkpoint(request) + response = await self.finalize_checkpoint(request) await stream.send_message(response) - async def __rpc_debug_command( - self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" + async def __rpc_flush_network_buffer( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.debug_command(request) + response = await self.flush_network_buffer(request) await stream.send_message(response) - async def __rpc_evaluate_python_expression( - self, - stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, EvaluatePythonExpressionResponse]", + async def __rpc_initialize_executor( + self, stream: "grpclib.server.Stream[InitializeExecutorRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.evaluate_python_expression(request) + response = await self.initialize_executor(request) await stream.send_message(response) - async def __rpc_console_message_triggered( - self, - stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest, EmptyReturn]", + async def __rpc_open_executor( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.console_message_triggered(request) + response = await self.open_executor(request) await stream.send_message(response) - async def __rpc_port_completed( - self, stream: "grpclib.server.Stream[PortCompletedRequest, EmptyReturn]" + async def __rpc_pause_worker( + self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" ) -> None: request = await stream.recv_message() - response = await self.port_completed(request) + response = await self.pause_worker(request) await stream.send_message(response) - async def __rpc_start_workflow( - self, stream: "grpclib.server.Stream[EmptyRequest, StartWorkflowResponse]" + async def __rpc_prepare_checkpoint( + self, stream: "grpclib.server.Stream[PrepareCheckpointRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.start_workflow(request) + response = await self.prepare_checkpoint(request) await stream.send_message(response) - async def __rpc_resume_workflow( + async def __rpc_query_statistics( + self, stream: "grpclib.server.Stream[EmptyRequest, WorkerMetricsResponse]" + ) -> None: + request = await stream.recv_message() + response = await self.query_statistics(request) + await stream.send_message(response) + + async def __rpc_resume_worker( + self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" + ) -> None: + request = await stream.recv_message() + response = await self.resume_worker(request) + await stream.send_message(response) + + async def __rpc_retrieve_state( self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.resume_workflow(request) + response = await self.retrieve_state(request) await stream.send_message(response) - async def __rpc_pause_workflow( + async def __rpc_retry_current_tuple( self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.pause_workflow(request) + response = await self.retry_current_tuple(request) await stream.send_message(response) - async def __rpc_worker_state_updated( - self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest, EmptyReturn]" + async def __rpc_start_worker( + self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" ) -> None: request = await stream.recv_message() - response = await self.worker_state_updated(request) + response = await self.start_worker(request) await stream.send_message(response) - async def __rpc_worker_execution_completed( + async def __rpc_end_worker( self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.worker_execution_completed(request) + response = await self.end_worker(request) await stream.send_message(response) - async def __rpc_link_workers( - self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]" + async def __rpc_start_channel( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.link_workers(request) + response = await self.start_channel(request) await stream.send_message(response) - async def __rpc_controller_initiate_query_statistics( - self, stream: "grpclib.server.Stream[QueryStatisticsRequest, EmptyReturn]" + async def __rpc_end_channel( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.controller_initiate_query_statistics(request) + response = await self.end_channel(request) await stream.send_message(response) - async def __rpc_retry_workflow( - self, stream: "grpclib.server.Stream[RetryWorkflowRequest, EmptyReturn]" + async def __rpc_debug_command( + self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.retry_workflow(request) + response = await self.debug_command(request) + await stream.send_message(response) + + async def __rpc_evaluate_python_expression( + self, + stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, EvaluatedValue]", + ) -> None: + request = await stream.recv_message() + response = await self.evaluate_python_expression(request) + await stream.send_message(response) + + async def __rpc_no_operation( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.no_operation(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState": grpclib.const.Handler( - self.__rpc_retrieve_workflow_state, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel": grpclib.const.Handler( + self.__rpc_add_input_channel, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - RetrieveWorkflowStateResponse, + AddInputChannelRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage": grpclib.const.Handler( - self.__rpc_propagate_embedded_control_message, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning": grpclib.const.Handler( + self.__rpc_add_partitioning, grpclib.const.Cardinality.UNARY_UNARY, - PropagateEmbeddedControlMessageRequest, - PropagateEmbeddedControlMessageResponse, + AddPartitioningRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint": grpclib.const.Handler( - self.__rpc_take_global_checkpoint, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort": grpclib.const.Handler( + self.__rpc_assign_port, grpclib.const.Cardinality.UNARY_UNARY, - TakeGlobalCheckpointRequest, - TakeGlobalCheckpointResponse, + AssignPortRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand": grpclib.const.Handler( - self.__rpc_debug_command, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint": grpclib.const.Handler( + self.__rpc_finalize_checkpoint, grpclib.const.Cardinality.UNARY_UNARY, - DebugCommandRequest, + FinalizeCheckpointRequest, + FinalizeCheckpointResponse, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer": grpclib.const.Handler( + self.__rpc_flush_network_buffer, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression": grpclib.const.Handler( - self.__rpc_evaluate_python_expression, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor": grpclib.const.Handler( + self.__rpc_initialize_executor, grpclib.const.Cardinality.UNARY_UNARY, - EvaluatePythonExpressionRequest, - EvaluatePythonExpressionResponse, + InitializeExecutorRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered": grpclib.const.Handler( - self.__rpc_console_message_triggered, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor": grpclib.const.Handler( + self.__rpc_open_executor, grpclib.const.Cardinality.UNARY_UNARY, - ConsoleMessageTriggeredRequest, + EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted": grpclib.const.Handler( - self.__rpc_port_completed, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker": grpclib.const.Handler( + self.__rpc_pause_worker, grpclib.const.Cardinality.UNARY_UNARY, - PortCompletedRequest, + EmptyRequest, + WorkerStateResponse, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint": grpclib.const.Handler( + self.__rpc_prepare_checkpoint, + grpclib.const.Cardinality.UNARY_UNARY, + PrepareCheckpointRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow": grpclib.const.Handler( - self.__rpc_start_workflow, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics": grpclib.const.Handler( + self.__rpc_query_statistics, grpclib.const.Cardinality.UNARY_UNARY, EmptyRequest, - StartWorkflowResponse, + WorkerMetricsResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow": grpclib.const.Handler( - self.__rpc_resume_workflow, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker": grpclib.const.Handler( + self.__rpc_resume_worker, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, + WorkerStateResponse, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState": grpclib.const.Handler( + self.__rpc_retrieve_state, grpclib.const.Cardinality.UNARY_UNARY, EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow": grpclib.const.Handler( - self.__rpc_pause_workflow, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple": grpclib.const.Handler( + self.__rpc_retry_current_tuple, grpclib.const.Cardinality.UNARY_UNARY, EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated": grpclib.const.Handler( - self.__rpc_worker_state_updated, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker": grpclib.const.Handler( + self.__rpc_start_worker, grpclib.const.Cardinality.UNARY_UNARY, - WorkerStateUpdatedRequest, + EmptyRequest, + WorkerStateResponse, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker": grpclib.const.Handler( + self.__rpc_end_worker, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted": grpclib.const.Handler( - self.__rpc_worker_execution_completed, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel": grpclib.const.Handler( + self.__rpc_start_channel, grpclib.const.Cardinality.UNARY_UNARY, EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers": grpclib.const.Handler( - self.__rpc_link_workers, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel": grpclib.const.Handler( + self.__rpc_end_channel, grpclib.const.Cardinality.UNARY_UNARY, - LinkWorkersRequest, + EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics": grpclib.const.Handler( - self.__rpc_controller_initiate_query_statistics, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand": grpclib.const.Handler( + self.__rpc_debug_command, grpclib.const.Cardinality.UNARY_UNARY, - QueryStatisticsRequest, + DebugCommandRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow": grpclib.const.Handler( - self.__rpc_retry_workflow, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression": grpclib.const.Handler( + self.__rpc_evaluate_python_expression, grpclib.const.Cardinality.UNARY_UNARY, - RetryWorkflowRequest, + EvaluatePythonExpressionRequest, + EvaluatedValue, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation": grpclib.const.Handler( + self.__rpc_no_operation, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, EmptyReturn, ), } diff --git a/amber/src/main/python/pytexera/__init__.py b/amber/src/main/python/pytexera/__init__.py index e40d1a43fe0..c6001667380 100644 --- a/amber/src/main/python/pytexera/__init__.py +++ b/amber/src/main/python/pytexera/__init__.py @@ -19,6 +19,7 @@ from overrides import overrides from typing import Iterator, Optional, Union +from core.models.operator import LoopStartOperator, LoopEndOperator from pyamber import * from .storage.dataset_file_document import DatasetFileDocument from .storage.large_binary_input_stream import LargeBinaryInputStream @@ -43,6 +44,8 @@ "UDFTableOperator", "UDFBatchOperator", "UDFSourceOperator", + "LoopStartOperator", + "LoopEndOperator", "DatasetFileDocument", "largebinary", "LargeBinaryInputStream", diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala index e7763073232..22811b46417 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala @@ -43,7 +43,7 @@ abstract class AmberProcessor( with Serializable { /** FIFO & exactly once */ - val inputGateway: InputGateway = new NetworkInputGateway(this.actorId) + val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId) // 1. Unified Output val outputGateway: NetworkOutputGateway = @@ -55,7 +55,7 @@ abstract class AmberProcessor( } ) // 2. RPC Layer - val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId) + val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId) val asyncRPCServer: AsyncRPCServer = new AsyncRPCServer(outputGateway, actorId) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala index 4d9a36bab43..ae069db0786 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala @@ -34,6 +34,7 @@ class ControllerAsyncRPCHandlerInitializer( with AmberLogging with LinkWorkersHandler with WorkerExecutionCompletedHandler + with IterationCompletedHandler with WorkerStateUpdatedHandler with PauseHandler with QueryWorkerStatisticsHandler diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala index 7a8e94cf3a7..3461619cb36 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala @@ -44,7 +44,7 @@ class ControllerProcessor( val workflowScheduler: WorkflowScheduler = new WorkflowScheduler(workflowContext, actorId) val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new WorkflowExecutionCoordinator( - () => this.workflowScheduler.getNextRegions, + workflowScheduler, workflowExecution, controllerConfig, asyncRPCClient diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index b1acb3c0650..ff6df1f0a06 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -32,7 +32,7 @@ class WorkflowScheduler( actorId: ActorVirtualIdentity ) extends java.io.Serializable { var physicalPlan: PhysicalPlan = _ - private var schedule: Schedule = _ + var schedule: Schedule = _ def getSchedule: Schedule = schedule diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index b806479b892..2de29f31fdd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -44,6 +44,7 @@ case class WorkflowExecution() { * @throws AssertionError if the `RegionExecution` has already been initialized. */ def initRegionExecution(region: Region): RegionExecution = { + regionExecutions.remove(region.id) // ensure the region execution hasn't been initialized already. assert( !regionExecutions.contains(region.id), diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala new file mode 100644 index 00000000000..d87440fc6e3 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.controller.promisehandlers + +import com.twitter.util.Future +import org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer, ExecutionStateUpdate} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest, IterationCompletedRequest, QueryStatisticsRequest, StatisticsUpdateTarget} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.common.virtualidentity.util.SELF + +/** indicate a worker has completed its execution + * i.e. received and processed all data from upstreams + * note that this doesn't mean all the output of this worker + * has been received by the downstream workers. + * + * possible sender: worker + */ +trait IterationCompletedHandler { + this: ControllerAsyncRPCHandlerInitializer => + + override def iterationCompleted( + msg: IterationCompletedRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + cp.workflowExecutionCoordinator.loopBack(msg.loopStartId) + EmptyReturn() + } +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala index d81b4239ba7..b7611c8f8f9 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala @@ -41,6 +41,7 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) extends AmberLogging { private var portId: Option[PortIdentity] = None def acceptMessage(msg: WorkflowFIFOMessage): Unit = { + //channel remove val seq = msg.sequenceNumber val payload = msg.payload if (isDuplicated(seq)) { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala index 5cfd8aabc04..1d3ee3cb72c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala @@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) enforcers += enforcer } + def removeControlChannel(from: ActorVirtualIdentity): Unit = { + inputChannels.remove(ChannelIdentity(from, actorId, isControl = true)) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala index 929a30f4efa..e35e819d41f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala @@ -94,4 +94,8 @@ class NetworkOutputGateway( idToSequenceNums.getOrElseUpdate(channelId, new AtomicLong()).getAndIncrement() } + def removeControlChannel(to: ActorVirtualIdentity): Unit = { + idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true)) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 4ab3d18056f..a018158f369 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -22,6 +22,7 @@ package org.apache.texera.amber.engine.architecture.messaginglayer import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.BufferedItemWriter +import org.apache.texera.amber.core.storage.result.ResultSchema import org.apache.texera.amber.core.tuple._ import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity} @@ -124,6 +125,8 @@ class OutputManager( : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = mutable.HashMap() + private val storageUris: mutable.HashMap[Int, URI] = mutable.HashMap() + /** * Add down stream operator and its corresponding Partitioner. * @@ -232,6 +235,15 @@ class OutputManager( }) } + def saveStateToStorageIfNeeded(state: State, outputPortId: Int): Unit = { + val writer = DocumentFactory + .createDocument(this.storageUris(outputPortId).resolve("state"), state.schema) + .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .asInstanceOf[BufferedItemWriter[Tuple]] + writer.putOne(state.toTuple) + writer.close() + } + /** * Singal the port storage writer to flush the remaining buffer and wait for commits to finish so that * the output port is properly completed. If the output port does not need storage, no action will be done. @@ -280,6 +292,7 @@ class OutputManager( } private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = { + this.storageUris(portId.id) = storageUri val bufferedItemWriter = DocumentFactory .openDocument(storageUri) ._1 diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index a0c73b6506d..6793248d8a3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -181,6 +181,8 @@ class RegionExecutionCoordinator( val actorRef = actorRefService.getActorRef(workerId) // Remove the actorRef so that no other actors can find the worker and send messages. actorRefService.removeActorRef(workerId) + asyncRPCClient.inputGateway.removeControlChannel(workerId) + asyncRPCClient.outputGateway.removeControlChannel(workerId) gracefulStop(actorRef, ScalaDuration(5, TimeUnit.SECONDS)).asTwitter() } }.toSeq @@ -209,14 +211,15 @@ class RegionExecutionCoordinator( regionExecution: RegionExecution, attempt: Int = 1 ): Future[Unit] = { - terminateWorkers(regionExecution).rescue { case err => - logger.warn( - s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", - err - ) - Future - .sleep(killRetryDelay)(killRetryTimer) - .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) + terminateWorkers(regionExecution).rescue { + case err => + logger.warn( + s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", + err + ) + Future + .sleep(killRetryDelay)(killRetryTimer) + .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) } } @@ -568,7 +571,18 @@ class RegionExecutionCoordinator( region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) - DocumentFactory.createDocument(storageUriToAdd, schema) + + if (region.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) { + try { + DocumentFactory.openDocument(storageUriToAdd) + } catch { + case _: Exception => + DocumentFactory.createDocument(storageUriToAdd, schema) + } + } else { + DocumentFactory.createDocument(storageUriToAdd, schema) + } + WorkflowExecutionsResource.insertOperatorPortResultUri( eid = eid, globalPortId = outputPortId, diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index 6f34c9ed1e5..ea1539a6eae 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -19,8 +19,16 @@ package org.apache.texera.amber.engine.architecture.scheduling +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.OperatorIdentity +import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} +import org.apache.texera.amber.util.JSONUtils.objectMapper + case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) + private var loopStartLevel = currentLevel + private var iteration = 5 + private var i = 1 def getRegions: List[Region] = levelSets.values.flatten.toList @@ -31,4 +39,11 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat currentLevel += 1 regions } + + def loopBack(loopStartId: OperatorIdentity): Unit = + levelSets.collectFirst { + case (level, regions) + if regions.exists(_.getOperators.exists(_.id.logicalOpId == loopStartId)) => + level + }.foreach(currentLevel = _) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 1c3ae89471b..a17a52dd895 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -21,13 +21,10 @@ package org.apache.texera.amber.engine.architecture.scheduling import com.twitter.util.Future import com.typesafe.scalalogging.LazyLogging +import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink} -import org.apache.texera.amber.engine.architecture.common.{ - AkkaActorRefMappingService, - AkkaActorService -} -import org.apache.texera.amber.engine.architecture.controller.ControllerConfig -import org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate +import org.apache.texera.amber.engine.architecture.common.{AkkaActorRefMappingService, AkkaActorService} +import org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, ExecutionStateUpdate, WorkflowScheduler} import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient @@ -35,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable class WorkflowExecutionCoordinator( - getNextRegions: () => Set[Region], + workflowScheduler: WorkflowScheduler, workflowExecution: WorkflowExecution, controllerConfig: ControllerConfig, asyncRPCClient: AsyncRPCClient @@ -83,7 +80,7 @@ class WorkflowExecutionCoordinator( } // All existing regions are completed. Start the next region (if any). - val nextRegions = getNextRegions() + val nextRegions = workflowScheduler.getNextRegions if (nextRegions.isEmpty) { if (workflowExecution.isCompleted && completionNotified.compareAndSet(false, true)) { asyncRPCClient.sendToClient(ExecutionStateUpdate(workflowExecution.getState)) @@ -131,4 +128,8 @@ class WorkflowExecutionCoordinator( regionExecutionCoordinators.values.exists(!_.isCompleted) } + def loopBack(loopStartId: OperatorIdentity): Unit = { + workflowScheduler.schedule.loopBack(loopStartId) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 3aa5fa90a46..bc89f26a754 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -126,6 +126,7 @@ class DataProcessor( val outputState = executor.processState(state, port) if (outputState.isDefined) { outputManager.emitState(outputState.get) + outputManager.saveStateToStorageIfNeeded(state, port) } } catch safely { case e => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala index 10fbbc44a2c..91b22a98d9e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala @@ -21,31 +21,19 @@ package org.apache.texera.amber.engine.architecture.worker.managers import io.grpc.MethodDescriptor import org.apache.texera.amber.config.ApplicationConfig +import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.tuple.Tuple -import org.apache.texera.amber.core.virtualidentity.{ - ActorVirtualIdentity, - ChannelIdentity, - EmbeddedControlMessageIdentity -} +import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity} import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.toPartitioner -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{ - NO_ALIGNMENT, - PORT_ALIGNMENT -} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{NO_ALIGNMENT, PORT_ALIGNMENT} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{ - METHOD_END_CHANNEL, - METHOD_START_CHANNEL -} +import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{METHOD_END_CHANNEL, METHOD_START_CHANNEL} import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning -import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{ - DPInputQueueElement, - FIFOMessageElement -} -import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{DPInputQueueElement, FIFOMessageElement} +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, StateFrame, WorkflowFIFOMessage} import org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage import java.net.URI @@ -106,6 +94,14 @@ class InputPortMaterializationReaderThread( } // Flush any remaining tuples in the buffer. if (buffer.nonEmpty) flush() + val state_document = DocumentFactory.openDocument(uri.resolve("state"))._1.asInstanceOf[VirtualDocument[Tuple]] + val stateReadIterator = state_document.get() + + if (stateReadIterator.hasNext) { + val state = State(Option(stateReadIterator.next())) + inputMessageQueue.put(FIFOMessageElement(WorkflowFIFOMessage(channelId, getSequenceNumber, StateFrame(state)))) + } + emitECM(METHOD_END_CHANNEL, PORT_ALIGNMENT) isFinished.set(true) } catch { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala index 704ebd7f476..f7e26803b47 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala @@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{ EmbeddedControlMessageIdentity } import org.apache.texera.amber.engine.architecture.controller.ClientEvent -import org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway +import org.apache.texera.amber.engine.architecture.messaginglayer.{ + NetworkInputGateway, + NetworkOutputGateway +} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ @@ -125,7 +128,8 @@ object AsyncRPCClient { } class AsyncRPCClient( - outputGateway: NetworkOutputGateway, + val inputGateway: NetworkInputGateway, + val outputGateway: NetworkOutputGateway, val actorId: ActorVirtualIdentity ) extends AmberLogging { diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 72fb1c364e5..92582afdd2b 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -247,6 +247,8 @@ object WorkflowExecutionsResource { OPERATOR_PORT_EXECUTIONS.RESULT_URI ) .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString) + .onConflict() + .doNothing() .execute() } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala index 3226c9d2fe7..92abd9041c6 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala @@ -41,14 +41,15 @@ final case class State(tuple: Option[Tuple] = None, passToAllDownstream: Boolean def apply(key: String): Any = get(key) + def schema: Schema = + Schema(data.map { + case (name, (attrType, _)) => + new Attribute(name, attrType) + }.toList) + def toTuple: Tuple = Tuple - .builder( - Schema(data.map { - case (name, (attrType, _)) => - new Attribute(name, attrType) - }.toList) - ) + .builder(schema) .addSequentially(data.values.map(_._2).toArray) .build() diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index 15949ef4717..ae37def667e 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -72,6 +72,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case STATE => "state" case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } @@ -119,6 +120,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case STATE => "state" case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala index 3513ac5ecd8..990776a69f0 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala @@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration { val RESULT: Value = Value("result") val RUNTIME_STATISTICS: Value = Value("runtimeStatistics") val CONSOLE_MESSAGES: Value = Value("consoleMessages") + val STATE: Value = Value("state") } object VFSURIFactory { diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala index 549cb4b9d17..25b6df58001 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala @@ -29,7 +29,7 @@ import org.apache.iceberg.io.{DataWriter, OutputFile} import org.apache.iceberg.parquet.Parquet import org.apache.iceberg.{Schema, Table} -import java.nio.file.Paths +import java.nio.file.{Files, Path, Paths} import scala.collection.mutable.ArrayBuffer /** @@ -107,9 +107,12 @@ private[storage] class IcebergTableWriter[T]( private def flushBuffer(): Unit = { if (buffer.nonEmpty) { // Create a unique file path using the writer's identifier and the filename index - val filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_${filenameIdx}") - // Increment the filename index by 1 - filenameIdx += 1 + var filepath: Path = null + do { + filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_$filenameIdx") + filenameIdx += 1 + } while (Files.exists(filepath)) + val outputFile: OutputFile = table.io().newOutputFile(filepath.toString) // Create a Parquet data writer to write a new file val dataWriter: DataWriter[Record] = Parquet diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index 931596b1bf8..bef0c95cefb 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -138,6 +138,7 @@ import org.apache.texera.amber.operator.visualization.volcanoPlot.VolcanoPlotOpD import org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallChartOpDesc import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, ToStringBuilder} +import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc import org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc @@ -205,6 +206,8 @@ trait StateTransferFunc new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), new Type(value = classOf[LimitOpDesc], name = "Limit"), new Type(value = classOf[SleepOpDesc], name = "Sleep"), + new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"), + new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"), new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"), diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala new file mode 100644 index 00000000000..045aa87cba9 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopEndOpDesc extends LogicalOp { + @JsonProperty(required = true, defaultValue = "i += 1") + @JsonSchemaTitle("Update") + var update: String = _ + + @JsonProperty(required = true, defaultValue = "i < len(table)") + @JsonSchemaTitle("Condition") + var condition: String = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + val pythonCode = + try { + generatePythonCode() + } catch { + case ex: Throwable => + s"#EXCEPTION DURING CODE GENERATION: ${ex.getMessage}" + } + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(pythonCode, "python") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Loop End", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + + def generatePythonCode(): String = { + s""" + |from pytexera import * + |class ProcessLoopEndOperator(LoopEndOperator): + | @overrides + | def process_state(self, state: State, port: int) -> Optional[State]: + | from pickle import loads + | self.state = state.to_dict() + | self.state["table"] = loads(self.state["table"]) + | exec("$update", {}, self.state) + | return None + | + | @overrides + | def condition(self) -> None: + | exec("output = $condition", {}, self.state) + | return self.state["output"] + |""".stripMargin + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala new file mode 100644 index 00000000000..14d4c69c546 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopStartOpDesc extends LogicalOp { + @JsonProperty(required = true, defaultValue = "i = 0") + @JsonSchemaTitle("Initialization") + var initialization: String = _ + + @JsonProperty(required = true, defaultValue = "table.iloc[i]") + @JsonSchemaTitle("Output") + var output: String = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + val pythonCode = + try { + generatePythonCode() + } catch { + case ex: Throwable => + s"#EXCEPTION DURING CODE GENERATION: ${ex.getMessage}" + } + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(pythonCode, "python") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Loop Start", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + + def generatePythonCode(): String = { + s""" + |from pytexera import * + |class ProcessLoopStartOperator(LoopStartOperator): + | @overrides + | def open(self): + | self.state = {} + | exec("$initialization", {}, self.state) + | + | @overrides + | def process_state(self, state: State, port: int) -> Optional[State]: + | self.state.update(state.to_dict()) + | print(self.state) + | return None + | + | @overrides + | def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + | self.state["table"] = table + | exec("output = $output", {}, self.state) + | yield self.state["output"] + |""".stripMargin + } +} diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index f1532bf9a9d..592fe09c598 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -342,7 +342,6 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy body: { fill: "rgba(158,158,158,0.2)", pointerEvents: "none", - visibility: "hidden", }, }, }, diff --git a/frontend/src/assets/operator_images/LoopEnd.png b/frontend/src/assets/operator_images/LoopEnd.png new file mode 100644 index 00000000000..ee0f9ab6fac Binary files /dev/null and b/frontend/src/assets/operator_images/LoopEnd.png differ diff --git a/frontend/src/assets/operator_images/LoopStart.png b/frontend/src/assets/operator_images/LoopStart.png new file mode 100644 index 00000000000..7e5be023cdf Binary files /dev/null and b/frontend/src/assets/operator_images/LoopStart.png differ