1111from .ducklake import DuckLakeClient
1212from .ftp import FTPClient
1313from .models import BaseLocalFile , BaseRemoteFile
14+ from .extensions import Parquet
1415
1516Base = declarative_base ()
1617
@@ -108,8 +109,7 @@ def _attach_client_catalog(self, name: str, path: str):
108109 existing = conn .exec_driver_sql (q , (abs_path ,)).fetchone ()
109110
110111 if not existing :
111- conn .exec_driver_sql (f"ATTACH '{ abs_path } ' AS {
112- name } (READ_ONLY)" )
112+ conn .exec_driver_sql (f"ATTACH '{ abs_path } ' AS { name } (READ_ONLY)" )
113113
114114 async def __aexit__ (self , exc_type , exc_val , exc_tb ):
115115 if self ._ducklake :
@@ -122,7 +122,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
122122
123123 def _get_dest_path (self , file : BaseRemoteFile ) -> Path :
124124 client_name = file .client .name .lower ()
125- dataset_name = getattr ( file .parent , " name" , "unknown_dataset" )
125+ dataset_name = file .dataset . name . lower ( )
126126
127127 group_name = ""
128128 if hasattr (file , "group" ) and file .group :
@@ -148,8 +148,7 @@ async def _update_state(
148148 ):
149149 with self .Session () as session :
150150 record = (
151- session .query (LocalFileState ).filter_by (
152- path = str (local_path )).first ()
151+ session .query (LocalFileState ).filter_by (path = str (local_path )).first ()
153152 )
154153 if not record :
155154 record = LocalFileState (
@@ -231,36 +230,38 @@ async def download_to_parquet(
231230 file : BaseRemoteFile ,
232231 token : str = None ,
233232 callback : Callable [[int , int ], None ] = None ,
234- ):
233+ ) -> Parquet :
235234 local_file = await self .download (
236235 file = file ,
237236 token = token ,
238237 callback = callback ,
239238 )
240239
241- if hasattr (local_file , "to_parquet" ):
242- original_path = local_file .path
240+ if not hasattr (local_file , "to_parquet" ):
241+ raise NotImplementedError (
242+ f"{ local_file } can't be converted to Parquet" ,
243+ )
243244
244- parquet_file = await local_file .to_parquet ( callback = callback )
245+ original_path = local_file .path
245246
246- await self ._update_state (
247- local_path = parquet_file .path ,
248- remote_path = file .path ,
249- client_name = file .client .name .lower (),
250- status = DownloadStatus .COMPLETED ,
251- year = file .year ,
252- month = file .month ,
253- state = file .state ,
254- group = getattr (file .group , "name" , None ),
255- )
247+ parquet_file = await local_file .to_parquet (callback = callback )
256248
257- if original_path .exists () and original_path != parquet_file .path :
258- original_path .unlink ()
259- await self ._delete_record (str (original_path ))
249+ await self ._update_state (
250+ local_path = parquet_file .path ,
251+ remote_path = file .path ,
252+ client_name = file .client .name .lower (),
253+ status = DownloadStatus .COMPLETED ,
254+ year = file .year ,
255+ month = file .month ,
256+ state = file .state ,
257+ group = getattr (file .group , "name" , None ),
258+ )
260259
261- return parquet_file
260+ if original_path .exists () and original_path != parquet_file .path :
261+ original_path .unlink ()
262+ await self ._delete_record (str (original_path ))
262263
263- return local_file
264+ return parquet_file
264265
265266 def get_local_hierarchy (self ):
266267 with self .Session () as session :
0 commit comments