Skip to content

Commit dc3de63

Browse files
committed
fix: wrap methods in exception handlers
1 parent fc22e65 commit dc3de63

1 file changed

Lines changed: 171 additions & 151 deletions

File tree

src/query_farm_server_base/server.py

Lines changed: 171 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -294,28 +294,32 @@ def get_flight_info(
294294
context: flight.ServerCallContext,
295295
descriptor: flight.FlightDescriptor,
296296
) -> flight.FlightInfo:
297-
caller = self.credentials_from_context_(context)
298-
299-
logger = log.bind(
300-
**self.auth_logging_items(context, caller),
301-
descriptor=descriptor,
302-
)
303-
304-
logger.info(
305-
"get_flight_info",
306-
descriptor=descriptor,
307-
)
308-
309-
call_context = CallContext(
310-
context=context,
311-
caller=caller,
312-
logger=logger,
313-
)
314-
315-
return self.impl_get_flight_info(
316-
context=call_context,
317-
descriptor=descriptor,
318-
)
297+
try:
298+
caller = self.credentials_from_context_(context)
299+
300+
logger = log.bind(
301+
**self.auth_logging_items(context, caller),
302+
descriptor=descriptor,
303+
)
304+
305+
logger.info(
306+
"get_flight_info",
307+
descriptor=descriptor,
308+
)
309+
310+
call_context = CallContext(
311+
context=context,
312+
caller=caller,
313+
logger=logger,
314+
)
315+
316+
return self.impl_get_flight_info(
317+
context=call_context,
318+
descriptor=descriptor,
319+
)
320+
except Exception as e:
321+
logger.exception("get_flight_info", error=str(e))
322+
raise
319323

320324
def impl_do_action(
321325
self,
@@ -515,35 +519,39 @@ def pack_result(self, value: Any) -> Iterator[bytes]:
515519
def do_action(
516520
self, context: flight.ServerCallContext, action: flight.Action
517521
) -> Iterator[bytes]:
518-
caller = self.credentials_from_context_(context)
519-
520-
logger = log.bind(
521-
**self.auth_logging_items(context, caller),
522-
)
523-
524-
call_context = CallContext(
525-
context=context,
526-
caller=caller,
527-
logger=logger,
528-
)
529-
530-
logger.info("pre action", action=action)
531-
if handler := self.action_handlers_.get(action.type):
532-
parameters = handler.decoder(action)
533-
logger.debug(action.type, parameters=parameters)
534-
535-
result = handler.method(context=call_context, parameters=parameters)
536-
if handler.post_transform:
537-
result = handler.post_transform(result)
538-
if handler.empty_result:
539-
return iter([])
540-
return self.pack_result(result) if handler.pack_result else iter([result])
541-
542-
logger.debug(action.type, action=action)
543-
return self.impl_do_action(
544-
context=call_context,
545-
action=action,
546-
)
522+
try:
523+
caller = self.credentials_from_context_(context)
524+
525+
logger = log.bind(
526+
**self.auth_logging_items(context, caller),
527+
)
528+
529+
call_context = CallContext(
530+
context=context,
531+
caller=caller,
532+
logger=logger,
533+
)
534+
535+
if handler := self.action_handlers_.get(action.type):
536+
parameters = handler.decoder(action)
537+
logger.debug(action.type, parameters=parameters)
538+
539+
result = handler.method(context=call_context, parameters=parameters)
540+
if handler.post_transform:
541+
result = handler.post_transform(result)
542+
if handler.empty_result:
543+
return iter([])
544+
result = self.pack_result(result) if handler.pack_result else iter([result])
545+
return result
546+
else:
547+
logger.debug("action", type=action.type, action=action)
548+
return self.impl_do_action(
549+
context=call_context,
550+
action=action,
551+
)
552+
except Exception as e:
553+
logger.exception("do_action", error=str(e))
554+
raise
547555

548556
def impl_do_exchange(
549557
self,
@@ -598,72 +606,76 @@ def do_exchange(
598606
reader: flight.MetadataRecordBatchReader,
599607
writer: flight.MetadataRecordBatchWriter,
600608
) -> None:
601-
caller = self.credentials_from_context_(context)
602-
603-
logger = log.bind(
604-
**self.auth_logging_items(context, caller),
605-
descriptor=descriptor,
606-
)
607-
608-
call_context = CallContext(
609-
context=context,
610-
caller=caller,
611-
logger=logger,
612-
)
613-
614-
header_middleware = context.context.get_middleware("headers")
615-
airport_operation_headers = header_middleware.client_headers.get("airport-operation")
616-
if airport_operation_headers is not None and len(airport_operation_headers) > 0:
617-
airport_operation = airport_operation_headers[0]
618-
619-
return_chunks_headers = header_middleware.client_headers.get("return-chunks")
620-
if return_chunks_headers is None or len(return_chunks_headers) == 0:
621-
raise flight.FlightServerError(
622-
"The return-chunks header is required for this operation."
623-
)
624-
return_chunks: bool = return_chunks_headers[0] == "1"
625-
626-
last_metadata: Any
627-
if airport_operation == ExchangeOperation.INSERT:
628-
keys_inserted = self.exchange_insert(
629-
context=call_context,
630-
descriptor=descriptor,
631-
reader=reader,
632-
writer=writer,
633-
return_chunks=return_chunks,
634-
)
635-
last_metadata = {"total_inserted": keys_inserted}
636-
elif airport_operation == ExchangeOperation.UPDATE:
637-
keys_updated = self.exchange_update(
638-
context=call_context,
639-
descriptor=descriptor,
640-
reader=reader,
641-
writer=writer,
642-
return_chunks=return_chunks,
643-
)
644-
last_metadata = {"total_updated": keys_updated}
645-
elif airport_operation == ExchangeOperation.DELETE:
646-
keys_deleted = self.exchange_delete(
647-
context=call_context,
648-
descriptor=descriptor,
649-
reader=reader,
650-
writer=writer,
651-
return_chunks=return_chunks,
652-
)
653-
last_metadata = {"total_deleted": keys_deleted}
654-
else:
655-
raise flight.FlightServerError(
656-
f"Unknown airport-operation header: {airport_operation}"
657-
)
658-
writer.write_metadata(msgpack.packb(last_metadata))
659-
writer.close()
660-
661-
return self.impl_do_exchange(
662-
context=call_context,
663-
descriptor=descriptor,
664-
reader=reader,
665-
writer=writer,
666-
)
609+
try:
610+
caller = self.credentials_from_context_(context)
611+
612+
logger = log.bind(
613+
**self.auth_logging_items(context, caller),
614+
descriptor=descriptor,
615+
)
616+
617+
call_context = CallContext(
618+
context=context,
619+
caller=caller,
620+
logger=logger,
621+
)
622+
623+
header_middleware = context.context.get_middleware("headers")
624+
airport_operation_headers = header_middleware.client_headers.get("airport-operation")
625+
if airport_operation_headers is not None and len(airport_operation_headers) > 0:
626+
airport_operation = airport_operation_headers[0]
627+
628+
return_chunks_headers = header_middleware.client_headers.get("return-chunks")
629+
if return_chunks_headers is None or len(return_chunks_headers) == 0:
630+
raise flight.FlightServerError(
631+
"The return-chunks header is required for this operation."
632+
)
633+
return_chunks: bool = return_chunks_headers[0] == "1"
634+
635+
last_metadata: Any
636+
if airport_operation == ExchangeOperation.INSERT:
637+
keys_inserted = self.exchange_insert(
638+
context=call_context,
639+
descriptor=descriptor,
640+
reader=reader,
641+
writer=writer,
642+
return_chunks=return_chunks,
643+
)
644+
last_metadata = {"total_inserted": keys_inserted}
645+
elif airport_operation == ExchangeOperation.UPDATE:
646+
keys_updated = self.exchange_update(
647+
context=call_context,
648+
descriptor=descriptor,
649+
reader=reader,
650+
writer=writer,
651+
return_chunks=return_chunks,
652+
)
653+
last_metadata = {"total_updated": keys_updated}
654+
elif airport_operation == ExchangeOperation.DELETE:
655+
keys_deleted = self.exchange_delete(
656+
context=call_context,
657+
descriptor=descriptor,
658+
reader=reader,
659+
writer=writer,
660+
return_chunks=return_chunks,
661+
)
662+
last_metadata = {"total_deleted": keys_deleted}
663+
else:
664+
raise flight.FlightServerError(
665+
f"Unknown airport-operation header: {airport_operation}"
666+
)
667+
writer.write_metadata(msgpack.packb(last_metadata))
668+
writer.close()
669+
670+
return self.impl_do_exchange(
671+
context=call_context,
672+
descriptor=descriptor,
673+
reader=reader,
674+
writer=writer,
675+
)
676+
except Exception as e:
677+
logger.exception("do_exchange", error=str(e))
678+
raise
667679

668680
def impl_do_get(
669681
self,
@@ -676,24 +688,28 @@ def impl_do_get(
676688
def do_get(
677689
self, context: flight.ServerCallContext, ticket: flight.Ticket
678690
) -> flight.RecordBatchStream:
679-
caller = self.credentials_from_context_(context)
691+
try:
692+
caller = self.credentials_from_context_(context)
680693

681-
logger = log.bind(
682-
**self.auth_logging_items(context, caller),
683-
)
694+
logger = log.bind(
695+
**self.auth_logging_items(context, caller),
696+
)
684697

685-
logger.info("do_get", ticket=ticket)
698+
logger.info("do_get", ticket=ticket)
686699

687-
call_context = CallContext(
688-
context=context,
689-
caller=caller,
690-
logger=logger,
691-
)
700+
call_context = CallContext(
701+
context=context,
702+
caller=caller,
703+
logger=logger,
704+
)
692705

693-
return self.impl_do_get(
694-
context=call_context,
695-
ticket=ticket,
696-
)
706+
return self.impl_do_get(
707+
context=call_context,
708+
ticket=ticket,
709+
)
710+
except Exception as e:
711+
logger.exception("do_get", error=str(e))
712+
raise
697713

698714
def impl_do_put(
699715
self,
@@ -712,23 +728,27 @@ def do_put(
712728
reader: flight.MetadataRecordBatchReader,
713729
writer: flight.FlightMetadataWriter,
714730
) -> None:
715-
caller = self.credentials_from_context_(context)
716-
717-
logger = log.bind(
718-
**self.auth_logging_items(context, caller),
719-
)
720-
721-
logger.info("do_put", descriptor=descriptor)
722-
723-
call_context = CallContext(
724-
context=context,
725-
caller=caller,
726-
logger=logger,
727-
)
728-
729-
return self.impl_do_put(
730-
context=call_context,
731-
descriptor=descriptor,
732-
reader=reader,
733-
writer=writer,
734-
)
731+
try:
732+
caller = self.credentials_from_context_(context)
733+
734+
logger = log.bind(
735+
**self.auth_logging_items(context, caller),
736+
)
737+
738+
logger.info("do_put", descriptor=descriptor)
739+
740+
call_context = CallContext(
741+
context=context,
742+
caller=caller,
743+
logger=logger,
744+
)
745+
746+
return self.impl_do_put(
747+
context=call_context,
748+
descriptor=descriptor,
749+
reader=reader,
750+
writer=writer,
751+
)
752+
except Exception as e:
753+
logger.exception("do_put", error=str(e))
754+
raise

0 commit comments

Comments
 (0)