@@ -637,8 +637,6 @@ def image_normalize(
637637 if engine is None or engine .casefold () != "opencv" :
638638 raise ValueError ("Must specify the engine, supported value is 'opencv'." )
639639
640- import base64
641-
642640 import bigframes .bigquery as bbq
643641 import bigframes .blob ._functions as blob_func
644642 import bigframes .pandas as bpd
@@ -662,25 +660,25 @@ def image_normalize(
662660 df ["beta" ] = beta
663661 df ["norm_type" ] = norm_type
664662 df ["ext" ] = ext # type: ignore
665- df ["verbose" ] = verbose
666663 res = self ._df_apply_udf (df , image_normalize_udf )
667664
665+ normalized_content_b64_series = res ._apply_unary_op (
666+ ops .JSONValue (json_path = "$.content" )
667+ )
668+ normalized_bytes = bbq .sql_scalar (
669+ "FROM_BASE64({0})" , columns = [normalized_content_b64_series ]
670+ )
668671 if verbose :
669672 normalized_status_series = res ._apply_unary_op (
670673 ops .JSONValue (json_path = "$.status" )
671674 )
672- normalized_content_b64_series = res ._apply_unary_op (
673- ops .JSONValue (json_path = "$.content" )
674- )
675- # TODO this is not allowed, I need to find another way
676- normalized_bytes = base64 .b64decode (normalized_content_b64_series )
677675 results_df = bpd .DataFrame (
678676 {"status" : normalized_status_series , "content" : normalized_bytes }
679677 )
680678 results_struct = bbq .struct (results_df ).rename ("normalized_results" )
681679 return results_struct
682680 else :
683- return res
681+ return normalized_bytes . rename ( "normalized_bytes" )
684682
685683 if isinstance (dst , str ):
686684 dst = os .path .join (dst , "" )
@@ -708,22 +706,31 @@ def image_normalize(
708706 df ["beta" ] = beta
709707 df ["norm_type" ] = norm_type
710708 df ["ext" ] = ext # type: ignore
711- # df["verbose"] = verbose
712709
713710 res = self ._df_apply_udf (df , image_normalize_udf )
714711 res .cache () # to execute the udf
715712
713+ normalized_content_series = res ._apply_unary_op (
714+ ops .JSONValue (json_path = "$.content" )
715+ )
716+ normalized_content_blobs = normalized_content_series .str .to_blob (
717+ connection = connection
718+ )
719+
716720 if verbose :
717721 normalized_status_series = res ._apply_unary_op (
718722 ops .JSONValue (json_path = "$.status" )
719723 )
720724 results_df = bpd .DataFrame (
721- {"status" : normalized_status_series , "content" : dst }
725+ {
726+ "status" : normalized_status_series ,
727+ "content" : normalized_content_blobs ,
728+ }
722729 )
723730 results_struct = bbq .struct (results_df ).rename ("normalized_results" )
724731 return results_struct
725732 else :
726- return dst
733+ return normalized_content_blobs . rename ( "normalized_content" )
727734
728735 def pdf_extract (
729736 self ,
@@ -781,7 +788,7 @@ def pdf_extract(
781788
782789 extracted_content_series = res ._apply_unary_op (
783790 ops .JSONValue (json_path = "$.content" )
784- ). rename ( "extracted_content" )
791+ )
785792
786793 if verbose :
787794 status_series = res ._apply_unary_op (ops .JSONValue (json_path = "$.status" ))
@@ -791,7 +798,7 @@ def pdf_extract(
791798 results_struct = bbq .struct (results_df ).rename ("extracted_results" )
792799 return results_struct
793800 else :
794- return extracted_content_series
801+ return extracted_content_series . rename ( "extracted_content" )
795802
796803 def pdf_chunk (
797804 self ,
@@ -865,9 +872,8 @@ def pdf_chunk(
865872
866873 res = self ._df_apply_udf (df , pdf_chunk_udf )
867874
868- chunked_content_series = bbq .json_extract_string_array (res , "$.content" ).rename (
869- "chunked_content"
870- )
875+ chunked_content_series = bbq .json_extract_string_array (res , "$.content" )
876+
871877 if verbose :
872878 status_series = res ._apply_unary_op (ops .JSONValue (json_path = "$.status" ))
873879 results_df = bpd .DataFrame (
@@ -876,7 +882,7 @@ def pdf_chunk(
876882 resultes_struct = bbq .struct (results_df ).rename ("chunked_results" )
877883 return resultes_struct
878884 else :
879- return chunked_content_series
885+ return chunked_content_series . rename ( "chunked_content" )
880886
881887 def audio_transcribe (
882888 self ,
@@ -940,7 +946,7 @@ def audio_transcribe(
940946
941947 transcribed_content_series = cast (
942948 bpd .Series , transcribed_results ["ml_generate_text_llm_result" ]
943- ). rename ( "transcribed_content" )
949+ )
944950
945951 if verbose :
946952 transcribed_status_series = cast (
@@ -955,4 +961,4 @@ def audio_transcribe(
955961 results_struct = bbq .struct (results_df ).rename ("transcription_results" )
956962 return results_struct
957963 else :
958- return transcribed_content_series
964+ return transcribed_content_series . rename ( "transcribed_content" )
0 commit comments