Skip to content

Commit d8466bc

Browse files
committed
work in in progress
1 parent 8a9da39 commit d8466bc

1 file changed

Lines changed: 91 additions & 4 deletions

File tree

src/query_farm_server_base/server.py

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,16 @@ class AirportSerializedCatalogRoot(BaseModel):
6767
R = TypeVar("R")
6868

6969

70+
class ExchangeOperation(str, Enum):
71+
INSERT = "insert"
72+
UPDATE = "update"
73+
DELETE = "delete"
74+
75+
7076
class ActionType(str, Enum):
7177
"""
72-
All supported Arrow Flight action types.
73-
74-
This enum provides a single source of truth for action type strings used
75-
throughout the Flight server implementation.
78+
These are the DoAction action types that are supported by calling the
79+
separate action handlers.
7680
"""
7781

7882
# Schema modification actions
@@ -542,6 +546,42 @@ def impl_do_exchange(
542546
) -> None:
543547
raise NotImplementedError("impl_do_exchange not implemented")
544548

549+
def _unimplemented_exchange_operation(self, operation: ExchangeOperation) -> NoReturn:
550+
raise flight.FlightUnavailableError(f"The {operation} operation is not implemented")
551+
552+
def exchange_insert(
553+
self,
554+
*,
555+
context: CallContext[AccountType, TokenType],
556+
descriptor: flight.FlightDescriptor,
557+
reader: flight.MetadataRecordBatchReader,
558+
writer: flight.MetadataRecordBatchWriter,
559+
return_chunks: bool,
560+
) -> int:
561+
self._unimplemented_exchange_operation(ExchangeOperation.INSERT)
562+
563+
def exchange_delete(
564+
self,
565+
*,
566+
context: CallContext[AccountType, TokenType],
567+
descriptor: flight.FlightDescriptor,
568+
reader: flight.MetadataRecordBatchReader,
569+
writer: flight.MetadataRecordBatchWriter,
570+
return_chunks: bool,
571+
) -> int:
572+
self._unimplemented_exchange_operation(ExchangeOperation.DELETE)
573+
574+
def exchange_update(
575+
self,
576+
*,
577+
context: CallContext[AccountType, TokenType],
578+
descriptor: flight.FlightDescriptor,
579+
reader: flight.MetadataRecordBatchReader,
580+
writer: flight.MetadataRecordBatchWriter,
581+
return_chunks: bool,
582+
) -> int:
583+
self._unimplemented_exchange_operation(ExchangeOperation.UPDATE)
584+
545585
def do_exchange(
546586
self,
547587
context: flight.ServerCallContext,
@@ -562,6 +602,53 @@ def do_exchange(
562602
logger=logger,
563603
)
564604

605+
header_middleware = context.context.get_middleware("headers")
606+
airport_operation_headers = header_middleware.client_headers.get("airport-operation")
607+
if airport_operation_headers is not None and len(airport_operation_headers) > 0:
608+
airport_operation = airport_operation_headers[0]
609+
610+
return_chunks_headers = header_middleware.client_headers.get("return-chunks")
611+
if return_chunks_headers is None or len(return_chunks_headers) == 0:
612+
raise flight.FlightServerError(
613+
"The return-chunks header is required for this operation."
614+
)
615+
return_chunks: bool = return_chunks_headers[0] == "1"
616+
617+
last_metadata: Any
618+
if airport_operation == ExchangeOperation.INSERT:
619+
keys_inserted = self.exchange_insert(
620+
context=call_context,
621+
descriptor=descriptor,
622+
reader=reader,
623+
writer=writer,
624+
return_chunks=return_chunks,
625+
)
626+
last_metadata = {"total_inserted": keys_inserted}
627+
elif airport_operation == ExchangeOperation.UPDATE:
628+
keys_updated = self.exchange_update(
629+
context=call_context,
630+
descriptor=descriptor,
631+
reader=reader,
632+
writer=writer,
633+
return_chunks=return_chunks,
634+
)
635+
last_metadata = {"total_updated": keys_updated}
636+
elif airport_operation == ExchangeOperation.DELETE:
637+
keys_deleted = self.exchange_delete(
638+
context=call_context,
639+
descriptor=descriptor,
640+
reader=reader,
641+
writer=writer,
642+
return_chunks=return_chunks,
643+
)
644+
last_metadata = {"total_deleted": keys_deleted}
645+
else:
646+
raise flight.FlightServerError(
647+
f"Unknown airport-operation header: {airport_operation}"
648+
)
649+
writer.write_metadata(msgpack.packb(last_metadata))
650+
writer.close()
651+
565652
return self.impl_do_exchange(
566653
context=call_context,
567654
descriptor=descriptor,

0 commit comments

Comments
 (0)