Skip to content

Commit 606562f

Browse files
bpiwowarclaude
andcommitted
feat: add adaptive directory polling to detect new event files on NFS
Extract AdaptivePoller from PolledFile as a generic, reusable class for adaptive watchdog+polling scheduling. Use it for both file change polling (via PolledFile composition) and directory scanning in DirectoryWatch. On NFS/network filesystems, watchdog doesn't fire on_created events for remotely created files, so event file rotation (e.g. events-9.jsonl to events-10.jsonl) was never detected. DirectoryWatch now periodically scans for new files using AdaptivePoller, which adjusts scan frequency based on watchdog reliability: polls more when watchdog misses events, less when watchdog is working. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 69e5d72 commit 606562f

2 files changed

Lines changed: 342 additions & 45 deletions

File tree

src/experimaestro/filewatcher.py

Lines changed: 199 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -103,93 +103,183 @@ def _create_observer(watcher_type: WatcherType, polling_interval: float = 1.0):
103103

104104

105105
# =============================================================================
106-
# PolledFile (moved from scheduler/polling.py)
106+
# AdaptivePoller — generic adaptive polling with watchdog reliability tracking
107107
# =============================================================================
108108

109109

110110
@dataclass
111-
class PolledFile:
112-
"""State for a file being watched with polling fallback
111+
class AdaptivePoller:
112+
"""Adaptive polling scheduler with watchdog reliability tracking.
113+
114+
Tracks how reliably watchdog detects changes and adjusts the polling
115+
interval accordingly using Polyak (exponential moving) averaging:
113116
114-
Uses adaptive polling with Polyak averaging to track:
115-
- watchdog_reliability: How reliably watchdog detects changes (0.0-1.0)
116-
- estimated_change_interval: Expected time between file changes
117+
- When watchdog is reliable → poll less frequently
118+
- When watchdog misses changes (poll detects them) → poll more frequently
119+
- When changes happen rapidly → poll more frequently
120+
- When nothing changes for a while → slow down polling
117121
118-
The polling interval adapts based on both metrics:
119-
- When watchdog is reliable, poll less frequently
120-
- When file changes rapidly, poll more frequently
121-
- Uses exponential moving average (Polyak) for smooth adaptation
122+
This class is generic: it owns no I/O or check logic. The caller is
123+
responsible for performing the actual check and calling the appropriate
124+
notification method based on who detected the change.
122125
"""
123126

124-
path: Path
125-
last_size: int = 0
126-
last_change_time: float = field(default_factory=time.time)
127-
poll_interval: float = 0.5
128-
next_poll: float = 0.0
127+
min_interval: float = 0.5
128+
max_interval: float = 30.0
129129

130-
# Adaptive tracking using Polyak averaging
130+
# State
131131
watchdog_reliability: float = 0.5
132132
estimated_change_interval: float = 5.0
133-
134-
# Polling interval bounds
135-
MIN_INTERVAL: float = field(default=0.5, repr=False)
136-
MAX_INTERVAL: float = field(default=30.0, repr=False)
133+
poll_interval: float = 0.5
134+
next_poll: float = 0.0
135+
last_change_time: float = field(default_factory=time.time)
137136

138137
# Polyak averaging parameters
139-
POLYAK_ALPHA: float = field(default=0.3, repr=False)
140-
RELIABILITY_ALPHA: float = field(default=0.2, repr=False)
138+
_polyak_alpha: float = field(default=0.3, repr=False)
139+
_reliability_alpha: float = field(default=0.2, repr=False)
141140

142141
def schedule_next(self) -> None:
143-
"""Schedule the next poll time"""
142+
"""Schedule the next poll time."""
144143
self.next_poll = time.time() + self.poll_interval
145144

146145
def _update_change_interval(self) -> None:
147-
"""Update estimated change interval using Polyak averaging"""
146+
"""Update estimated change interval using Polyak averaging."""
148147
now = time.time()
149-
observed_interval = now - self.last_change_time
150-
observed_interval = max(0.1, min(observed_interval, self.MAX_INTERVAL * 2))
148+
observed = now - self.last_change_time
149+
observed = max(0.1, min(observed, self.max_interval * 2))
151150
self.estimated_change_interval = (
152-
self.POLYAK_ALPHA * observed_interval
153-
+ (1 - self.POLYAK_ALPHA) * self.estimated_change_interval
151+
self._polyak_alpha * observed
152+
+ (1 - self._polyak_alpha) * self.estimated_change_interval
154153
)
155154
self.last_change_time = now
156155

157156
def _compute_poll_interval(self) -> None:
158-
"""Compute poll interval based on reliability and change frequency"""
159-
base = max(self.MIN_INTERVAL, self.estimated_change_interval * 0.5)
157+
"""Compute poll interval based on reliability and change frequency."""
158+
base = max(self.min_interval, self.estimated_change_interval * 0.5)
160159
self.poll_interval = min(
161-
base + (self.MAX_INTERVAL - base) * self.watchdog_reliability,
162-
self.MAX_INTERVAL,
160+
base + (self.max_interval - base) * self.watchdog_reliability,
161+
self.max_interval,
163162
)
164163

165164
def on_poll_detected_change(self) -> None:
166-
"""Called when POLLING detected a change (watchdog missed it)"""
165+
"""Called when POLLING detected a change (watchdog missed it)."""
167166
self._update_change_interval()
168167
self.watchdog_reliability = (
169-
self.RELIABILITY_ALPHA * 0.0
170-
+ (1 - self.RELIABILITY_ALPHA) * self.watchdog_reliability
168+
self._reliability_alpha * 0.0
169+
+ (1 - self._reliability_alpha) * self.watchdog_reliability
171170
)
172171
self._compute_poll_interval()
173172
self.schedule_next()
174173

175174
def on_watchdog_detected_change(self) -> None:
176-
"""Called when WATCHDOG detected a change"""
175+
"""Called when WATCHDOG detected a change."""
177176
self._update_change_interval()
178177
self.watchdog_reliability = (
179-
self.RELIABILITY_ALPHA * 1.0
180-
+ (1 - self.RELIABILITY_ALPHA) * self.watchdog_reliability
178+
self._reliability_alpha * 1.0
179+
+ (1 - self._reliability_alpha) * self.watchdog_reliability
181180
)
182181
self._compute_poll_interval()
183182
self.schedule_next()
184183

185184
def on_no_activity(self) -> None:
186-
"""Called when no changes detected during poll"""
185+
"""Called when no changes detected during poll."""
187186
self.estimated_change_interval = min(
188-
self.estimated_change_interval * 1.2, self.MAX_INTERVAL * 2
187+
self.estimated_change_interval * 1.2, self.max_interval * 2
189188
)
190189
self._compute_poll_interval()
191190
self.schedule_next()
192191

192+
@property
193+
def is_due(self) -> bool:
194+
"""Whether this poller is due for a check."""
195+
return time.time() >= self.next_poll
196+
197+
198+
# =============================================================================
199+
# PolledFile — file-specific polling using AdaptivePoller
200+
# =============================================================================
201+
202+
203+
@dataclass
204+
class PolledFile:
205+
"""State for a file being watched with adaptive polling fallback.
206+
207+
Combines file-specific state (path, last_size) with an AdaptivePoller
208+
that manages the polling schedule and watchdog reliability tracking.
209+
"""
210+
211+
path: Path
212+
last_size: int = 0
213+
poller: AdaptivePoller = field(default_factory=AdaptivePoller)
214+
215+
# --- Delegate properties for backward compatibility ---
216+
217+
@property
218+
def poll_interval(self) -> float:
219+
return self.poller.poll_interval
220+
221+
@poll_interval.setter
222+
def poll_interval(self, value: float) -> None:
223+
self.poller.poll_interval = value
224+
225+
@property
226+
def next_poll(self) -> float:
227+
return self.poller.next_poll
228+
229+
@next_poll.setter
230+
def next_poll(self, value: float) -> None:
231+
self.poller.next_poll = value
232+
233+
@property
234+
def watchdog_reliability(self) -> float:
235+
return self.poller.watchdog_reliability
236+
237+
@watchdog_reliability.setter
238+
def watchdog_reliability(self, value: float) -> None:
239+
self.poller.watchdog_reliability = value
240+
241+
@property
242+
def estimated_change_interval(self) -> float:
243+
return self.poller.estimated_change_interval
244+
245+
@estimated_change_interval.setter
246+
def estimated_change_interval(self, value: float) -> None:
247+
self.poller.estimated_change_interval = value
248+
249+
@property
250+
def MIN_INTERVAL(self) -> float:
251+
return self.poller.min_interval
252+
253+
@MIN_INTERVAL.setter
254+
def MIN_INTERVAL(self, value: float) -> None:
255+
self.poller.min_interval = value
256+
257+
@property
258+
def MAX_INTERVAL(self) -> float:
259+
return self.poller.max_interval
260+
261+
@MAX_INTERVAL.setter
262+
def MAX_INTERVAL(self, value: float) -> None:
263+
self.poller.max_interval = value
264+
265+
def schedule_next(self) -> None:
266+
self.poller.schedule_next()
267+
268+
def _compute_poll_interval(self) -> None:
269+
self.poller._compute_poll_interval()
270+
271+
def on_poll_detected_change(self) -> None:
272+
"""Called when POLLING detected a change (watchdog missed it)."""
273+
self.poller.on_poll_detected_change()
274+
275+
def on_watchdog_detected_change(self) -> None:
276+
"""Called when WATCHDOG detected a change."""
277+
self.poller.on_watchdog_detected_change()
278+
279+
def on_no_activity(self) -> None:
280+
"""Called when no changes detected during poll."""
281+
self.poller.on_no_activity()
282+
193283
def update_size(self) -> bool | None:
194284
"""Update the last known size.
195285
@@ -422,6 +512,15 @@ def __init__(
422512
TailedFilePool(max_open=max_open_files) if enable_tailing else None
423513
)
424514

515+
# Directory scanning poller: detects new files when watchdog misses
516+
# on_created events (e.g. NFS, GPFS, Lustre)
517+
self._dir_poller = AdaptivePoller(
518+
min_interval=min_poll_interval,
519+
max_interval=max_poll_interval,
520+
)
521+
self._dir_poller.schedule_next()
522+
self._known_files: set[Path] = set()
523+
425524
# Set up watchdog
426525
self._setup_watchdog()
427526

@@ -441,6 +540,9 @@ def add_file(self, path: Path) -> None:
441540
if not self._file_filter(path):
442541
return
443542

543+
# Track in known files so directory polling doesn't re-discover it
544+
self._known_files.add(path)
545+
444546
with self._lock:
445547
if path in self._files:
446548
return
@@ -452,10 +554,12 @@ def add_file(self, path: Path) -> None:
452554
polled = PolledFile(
453555
path=path,
454556
last_size=size,
455-
poll_interval=self._min_poll_interval,
557+
poller=AdaptivePoller(
558+
min_interval=self._min_poll_interval,
559+
max_interval=self._max_poll_interval,
560+
poll_interval=self._min_poll_interval,
561+
),
456562
)
457-
polled.MIN_INTERVAL = self._min_poll_interval
458-
polled.MAX_INTERVAL = self._max_poll_interval
459563
polled.schedule_next()
460564
self._files[path] = polled
461565
logger.debug("DirectoryWatch: tracking %s", path)
@@ -541,15 +645,25 @@ def _handle_file_change(self, path: Path, from_watchdog: bool = False) -> None:
541645
logger.exception("Error in change callback for %s", path)
542646

543647
def poll(self) -> float:
544-
"""Poll all tracked files. Returns time until next poll."""
648+
"""Poll all tracked files and scan directory for new files.
649+
650+
Returns time until next poll.
651+
"""
545652
now = time.time()
546653
next_wake = now + 1.0
547654

655+
# Directory scan: detect new files that watchdog missed
656+
if self._dir_poller.is_due:
657+
self._scan_for_new_files()
658+
659+
if self._dir_poller.next_poll < next_wake:
660+
next_wake = self._dir_poller.next_poll
661+
548662
with self._lock:
549663
files_snapshot = list(self._files.values())
550664

551665
for polled in files_snapshot:
552-
if polled.next_poll <= now:
666+
if polled.poller.is_due:
553667
changed = polled.update_size()
554668
if changed is None:
555669
# File was deleted — trigger deletion callback
@@ -578,6 +692,44 @@ def poll(self) -> float:
578692

579693
return max(0.1, next_wake - time.time())
580694

695+
def _scan_for_new_files(self) -> None:
696+
"""Scan the watched directory for new files not yet tracked.
697+
698+
Uses the adaptive _dir_poller to adjust scan frequency based on
699+
whether watchdog is reliably detecting new file creation.
700+
On NFS/network filesystems, watchdog often misses on_created
701+
events, so the poller will gradually increase scan frequency.
702+
"""
703+
try:
704+
if self._recursive:
705+
current_files = set(p for p in self._path.rglob("*") if p.is_file())
706+
else:
707+
current_files = set(p for p in self._path.iterdir() if p.is_file())
708+
except OSError:
709+
self._dir_poller.on_no_activity()
710+
return
711+
712+
# Filter to matching files
713+
current_files = {p for p in current_files if self._file_filter(p)}
714+
715+
new_files = current_files - self._known_files
716+
self._known_files = current_files
717+
718+
if new_files:
719+
# Poll discovered new files → watchdog missed them
720+
self._dir_poller.on_poll_detected_change()
721+
for path in sorted(new_files):
722+
logger.debug("Directory poll discovered new file: %s", path)
723+
if self._on_created:
724+
try:
725+
self._on_created(path)
726+
except Exception:
727+
logger.exception("Error in created callback for %s", path)
728+
self.add_file(path)
729+
self._handle_file_change(path, from_watchdog=False)
730+
else:
731+
self._dir_poller.on_no_activity()
732+
581733
def close(self) -> None:
582734
"""Stop watching and release resources."""
583735
if self._closed:
@@ -635,6 +787,8 @@ def on_created(self, event):
635787
if watch and watch._file_filter(Path(event.src_path)):
636788
path = Path(event.src_path)
637789
logger.debug("Watchdog on_created: %s", path)
790+
# Watchdog detected creation → increase directory poller reliability
791+
watch._dir_poller.on_watchdog_detected_change()
638792
if watch._on_created:
639793
try:
640794
watch._on_created(path)

0 commit comments

Comments
 (0)