Skip to content

Commit 242521f

Browse files
committed
work in progress
1 parent ffc7d5b commit 242521f

2 files changed

Lines changed: 38 additions & 21 deletions

File tree

src/query_farm_server_base/flight_inventory.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@
1313
AirportSerializedSchema,
1414
)
1515

16-
# SCHEMA_BASE_URL = "https://schemas.beta.database.flights"
17-
# SCHEMA_BUCKET_NAME = "schemas.beta.database.flights"
18-
1916
# This is the level of ZStandard compression to use for individual FlightInfo
2017
# objects, since the schemas are pretty small, we can use a lower compression
2118
# preferring fast decompression.
@@ -72,6 +69,14 @@ def serialize(self) -> bytes:
7269
FlightInventoryWithMetadata = tuple[flight.FlightInfo, FlightSchemaMetadata]
7370

7471

72+
@dataclass
73+
class UploadParameters:
74+
s3_client: S3Client
75+
base_url: str
76+
bucket_name: str
77+
bucket_prefix: str | None = None
78+
79+
7580
def upload_and_generate_schema_list(
7681
*,
7782
flight_service_name: str,
@@ -80,10 +85,11 @@ def upload_and_generate_schema_list(
8085
skip_upload: bool,
8186
catalog_version: int,
8287
catalog_version_fixed: bool,
83-
schema_base_url: str,
84-
schema_bucket_name: str,
85-
s3_client: S3Client,
86-
s3_bucket_prefix: str | None = None,
88+
upload_parameters: UploadParameters,
89+
# schema_base_url: str,
90+
# schema_bucket_name: str,
91+
# s3_client: S3Client,
92+
# s3_bucket_prefix: str | None = None,
8793
serialize_inline: bool = False,
8894
) -> AirportSerializedCatalogRoot:
8995
serialized_schema_data: list[AirportSerializedSchema] = []
@@ -99,7 +105,12 @@ def upload_and_generate_schema_list(
99105
#
100106
# I think we can suffer with this problem for a bit longer.
101107
#
102-
s3_bucket_prefix = s3_bucket_prefix.rstrip("/") + "/" if s3_bucket_prefix else ""
108+
if upload_parameters and upload_parameters.bucket_prefix:
109+
upload_parameters.bucket_prefix = (
110+
upload_parameters.bucket_prefix.rstrip("/") + "/"
111+
if upload_parameters.bucket_prefix
112+
else ""
113+
)
103114

104115
for catalog_name, schema_names in flight_inventory.items():
105116
for schema_name, schema_items in schema_names.items():
@@ -110,16 +121,14 @@ def upload_and_generate_schema_list(
110121
assert packed_flight_info
111122

112123
uploaded_schema_contents = schema_uploader.upload(
113-
s3_client=s3_client,
124+
s3_client=upload_parameters.s3_client,
114125
data=packed_flight_info,
115126
compression_level=SCHEMA_COMPRESSION_LEVEL,
116-
key_prefix=f"{s3_bucket_prefix}schemas/{flight_service_name}/{catalog_name}",
117-
bucket=schema_bucket_name,
127+
key_prefix=f"{upload_parameters.bucket_prefix}schemas/{flight_service_name}/{catalog_name}",
128+
bucket=upload_parameters.bucket_name,
118129
skip_upload=skip_upload or serialize_inline,
119130
)
120131

121-
schema_path = f"{schema_base_url}/{uploaded_schema_contents.s3_path}"
122-
123132
assert uploaded_schema_contents.compressed_data
124133

125134
all_schema_flights_serialized.append(
@@ -136,7 +145,9 @@ def upload_and_generate_schema_list(
136145
if schema_name in schema_details
137146
else "",
138147
contents=AirportSerializedContentsWithSHA256Hash(
139-
url=schema_path if not serialize_inline else None,
148+
url=f"{upload_parameters.base_url}/{uploaded_schema_contents.s3_path}"
149+
if not serialize_inline
150+
else None,
140151
sha256=uploaded_schema_contents.sha256_hash,
141152
serialized=None,
142153
),
@@ -148,20 +159,21 @@ def upload_and_generate_schema_list(
148159
assert all_packed
149160

150161
all_schema_contents_upload = schema_uploader.upload(
151-
s3_client=s3_client,
162+
s3_client=upload_parameters.s3_client,
152163
data=all_packed,
153-
key_prefix=f"{s3_bucket_prefix}schemas/{flight_service_name}",
154-
bucket=schema_bucket_name,
164+
key_prefix=f"{upload_parameters.bucket_prefix}schemas/{flight_service_name}",
165+
bucket=upload_parameters.bucket_name,
155166
compression_level=None, # Don't compress since all contained schemas are compressed
156167
skip_upload=skip_upload or serialize_inline,
157168
)
158-
all_schema_path = f"{schema_base_url}/{all_schema_contents_upload.s3_path}"
159169

160170
return AirportSerializedCatalogRoot(
161171
schemas=serialized_schema_data,
162172
contents=AirportSerializedContentsWithSHA256Hash(
163173
sha256=all_schema_contents_upload.sha256_hash,
164-
url=all_schema_path if not serialize_inline else None,
174+
url=f"{upload_parameters.base_url}/{all_schema_contents_upload.s3_path}"
175+
if not serialize_inline
176+
else None,
165177
serialized=all_schema_contents_upload.compressed_data if serialize_inline else None,
166178
),
167179
version_info=server.GetCatalogVersionResult(

src/query_farm_server_base/schema_uploader.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@ def _compress_and_prefix_with_length(data: bytes, compression_level: int) -> byt
2525
return result
2626

2727

28+
def _hash_value(data: bytes) -> str:
29+
return hashlib.sha256(data).hexdigest()
30+
31+
2832
def _build_sha256_key_name_and_hash(key_prefix: str, data: bytes) -> tuple[str, str]:
29-
sha256_hash = hashlib.sha256(data).hexdigest()
33+
sha256_hash = _hash_value(data)
3034
return (
3135
f"{key_prefix}/{hex_to_url_safe_characters(sha256_hash)}",
3236
sha256_hash,
@@ -36,7 +40,7 @@ def _build_sha256_key_name_and_hash(key_prefix: str, data: bytes) -> tuple[str,
3640
def upload(
3741
*,
3842
compression_level: int | None,
39-
s3_client: S3Client,
43+
s3_client: S3Client | None,
4044
data: bytes,
4145
key_prefix: str,
4246
bucket: str,
@@ -62,6 +66,7 @@ def upload(
6266
s3_path, sha256_hash = _build_sha256_key_name_and_hash(key_prefix, data)
6367

6468
if not skip_upload:
69+
assert s3_client, "S3 client must be provided if not skipping upload"
6570
s3_client.put_object(
6671
Body=data,
6772
Bucket=bucket,

0 commit comments

Comments
 (0)