Context
I've been building a platform that periodically monitors DataSUS sources and downloads files when they are new or updated, with the goal of compiling a collection of epidemiological indicators. While working on this, I ran into some concurrency issues worth discussing.
Current Behavior
There are two related issues when using the download methods concurrently:
1. download in multi-threaded settings
download relies on FTPSingleton, a globally shared FTP connection. When two threads call download concurrently, they end up sharing the same socket, which can interleave FTP commands and corrupt the transfer, even when downloading different files. This will usually raise exceptions.
2. async_download under concurrent async tasks
async_download uses aioftp, which is built on asyncio streams with no internal locking. The event loop can interleave await points inside aioftp's internals, mixing up FTP control channel commands across concurrent transfers. This can silently produce corrupted files even when each call targets a different file.
The tricky part is that when the file gets corrupted, no exceptions are raised (I can reproduce it relatively consistently by download multiple files at once). Eventually one .parquet will get corrupted.
Proposed Solution
Remove FTPSingleton and give each download call its own connection. Establishing an FTP connection is lightweight and the transfer time dominates completely, so there's negligible overhead:
from contextlib import contextmanager
from ftplib import FTP
@contextmanager
def ftp_connection():
ftp = FTP("ftp.datasus.gov.br")
ftp.login()
try:
yield ftp
finally:
try:
ftp.quit()
except Exception:
ftp.close()
Remove aioftp and reimplement async_download using asyncio.to_thread:
async def async_download(self, local_dir: str = CACHEPATH) -> Data:
return await asyncio.to_thread(self.download, local_dir)
This delegates to the (now thread-safe) download in a thread pool. async/await still works and user code doesn't break. Concurrency control (e.g. limiting simultaneous connections via asyncio.Semaphore) is left to the user.
Trade-offs
- Performance:
to_thread uses system threads, which are slightly heavier than asyncio coroutines. In practice this should be irrelevant as the file download itself dominates the time taken.
- Dependency: removes
aioftp as a dependency.
Worth noting that multi-threading is not going to make downloads faster and FTP servers will likely reject multiple simultaneous connections. The main goal is to prevent that PySUS breaks user code that is multi-threaded.
Happy to work on a PR if you are aligned with this direction!
Context
I've been building a platform that periodically monitors DataSUS sources and downloads files when they are new or updated, with the goal of compiling a collection of epidemiological indicators. While working on this, I ran into some concurrency issues worth discussing.
Current Behavior
There are two related issues when using the download methods concurrently:
1.
downloadin multi-threaded settingsdownloadrelies on FTPSingleton, a globally shared FTP connection. When two threads calldownloadconcurrently, they end up sharing the same socket, which can interleave FTP commands and corrupt the transfer, even when downloading different files. This will usually raise exceptions.2.
async_downloadunder concurrent async tasksasync_downloadusesaioftp, which is built onasynciostreams with no internal locking. The event loop can interleaveawaitpoints insideaioftp's internals, mixing up FTP control channel commands across concurrent transfers. This can silently produce corrupted files even when each call targets a different file.The tricky part is that when the file gets corrupted, no exceptions are raised (I can reproduce it relatively consistently by download multiple files at once). Eventually one
.parquetwill get corrupted.Proposed Solution
Remove
FTPSingletonand give eachdownloadcall its own connection. Establishing an FTP connection is lightweight and the transfer time dominates completely, so there's negligible overhead:Remove
aioftpand reimplementasync_downloadusingasyncio.to_thread:This delegates to the (now thread-safe)
downloadin a thread pool.async/awaitstill works and user code doesn't break. Concurrency control (e.g. limiting simultaneous connections viaasyncio.Semaphore) is left to the user.Trade-offs
to_threaduses system threads, which are slightly heavier thanasynciocoroutines. In practice this should be irrelevant as the file download itself dominates the time taken.aioftpas a dependency.Worth noting that multi-threading is not going to make downloads faster and FTP servers will likely reject multiple simultaneous connections. The main goal is to prevent that PySUS breaks user code that is multi-threaded.
Happy to work on a PR if you are aligned with this direction!