refactor(sync): subduction#23
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughReplaces the samod-based repo/sync stack with a new async runtime using subduction/sedimentree, adds typed 32-byte IDs (DocId32/PeerId32), removes the per-doc broker and iroh module, introduces a runtime actor for live docs/peer sync, and migrates consumer crates to the new async BigDocHandle/PeerId APIs. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Caller
participant BigRepo as BigRepo
participant Runtime as BigRepoRuntime
participant Storage as Subduction/Sedimentree
participant Peer as RemotePeer
Client->>BigRepo: sync_doc_with_peer(doc_id, peer_id, timeout)
BigRepo->>Runtime: request sync_doc_with_peer(doc_id, peer_id, timeout)
Runtime->>Storage: ensure/load doc blobs (load_doc/export)
Storage-->>Runtime: Option<Automerge snapshot / blobs>
Runtime->>Peer: sync_with_peer(endpoint, doc_id)
Peer-->>Runtime: SyncDocOutcome (Success | NotFound | TransportError | IoError)
alt Success
Runtime->>Storage: persist received blobs
Runtime->>Runtime: apply blobs, compute patches
Runtime->>BigRepo: merge updated doc into live bundle
BigRepo-->>Client: SyncDocOutcome::Success
else Failure
Runtime-->>BigRepo: propagate outcome
BigRepo-->>Client: SyncDocOutcome::TransportError/NotFound/IoError
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/am_utils_rs/Cargo.toml (1)
14-19:⚠️ Potential issue | 🟠 MajorKeep the new sync stack behind the existing feature flags.
These dependencies are unconditional now, so
am_utils_rsstill pulls the full subduction/keyhive/websocket stack even when a consumer opts out ofrepo/iroh. That breaks the crate’s currentdefault-features = falsesplit and can drag native-only build requirements into lighter targets.Also applies to: 56-67
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/am_utils_rs/Cargo.toml` around lines 14 - 19, Unconditional dependencies were added for the new sync stack; instead mark tokio-tungstenite, sqlx, serde_json, and irpc as optional and wire them into the existing feature flags (e.g., repo and iroh) so they are only pulled when those features are enabled. Concretely: make each dependency optional (optional = true) and add them to the appropriate [features] entries (e.g., include "tokio-tungstenite", "sqlx", "serde_json", "irpc" under the "repo" and/or "iroh" feature lists) so the crate’s default-features = false split is preserved.src/daybook_core/sync/full.rs (2)
1489-1494:⚠️ Potential issue | 🔴 CriticalKeep a single peer's delete event from cancelling every endpoint's doc sync.
self.scheduler.docs_to_stop.insert(doc_id.clone())is still doc-scoped, butbatch_stop_docs()now clears allDocSyncTaskKeys for thatdoc_id. If another endpoint still has that document queued or active, its sync gets cancelled and never re-queued from this path, leaving the remaining requester stuck stale.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/sync/full.rs` around lines 1489 - 1494, The current call to self.scheduler.docs_to_stop.insert(doc_id.clone()) is doc-scoped and causes batch_stop_docs() to clear every DocSyncTaskKey for that doc_id, cancelling syncs for other endpoints; change this to insert the endpoint-scoped DocSyncTaskKey for this particular requester (e.g., construct and insert DocSyncTaskKey::new(doc_id.clone(), endpoint_id.clone()) or call the scheduler API that accepts a DocSyncTaskKey) so batch_stop_docs() only clears the specific endpoint's task rather than every task for doc_id.
422-428:⚠️ Potential issue | 🟠 MajorSurface unexpected full-sync worker failures instead of only logging them.
This is the top-level actor task spawned at lines 422-428. When
fut.awaitreturns an error, the task only logs it and completes normally, leaving failures hidden from tokio's perspective. Replace the error logging with.unwrap()to ensure unexpected errors are surfaced immediately, following the pattern:tokio::spawn(async { fut.await.unwrap() }).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/sync/full.rs` around lines 422 - 428, The spawned FullSyncWorker task currently swallows errors by logging them in the block that awaits fut (see join_handle and fut), so replace the error-handling block with a direct unwrap of fut's Result to surface unexpected failures to Tokio; specifically, change the async closure passed to tokio::task::spawn so it simply awaits fut and calls .unwrap() (i.e., fut.await.unwrap()) instead of matching Err and calling warn!, and keep the tracing::info_span!("FullSyncWorker task") instrumentation.
🧹 Nitpick comments (4)
AGENTS.md (1)
133-133: Consider rephrasing for better readability.The phrase "to a task completion" is grammatically awkward. Consider "to complete a task" or "until task completion" instead.
♻️ Suggested improvement
-Don't hack and boil the ocean to a task completion, you'll be asked to do it again if you do it wrong. +Don't hack and boil the ocean to complete a task, you'll be asked to do it again if you do it wrong.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@AGENTS.md` at line 133, The sentence uses the awkward phrase "to a task completion"; update the wording to improve readability by replacing "to a task completion" with a clearer alternative such as "to complete a task" or "until task completion" and adjust surrounding words to keep the original meaning and tone (locate the phrase "to a task completion" and edit that clause accordingly).src/daybook_core/rt/dispatch.rs (1)
157-159: Avoid exposing the raw SQLite pool fromDispatchRepo.This accessor exists only so
src/daybook_core/rt/switch.rscan reach into dispatch storage, which couples two repos through internals. Prefer threading the pool, or the loadedSwitchStore, through the boot path instead of adding a repo escape hatch.Based on learnings: "If a requested change requires changing interfaces, break and change the interfaces instead of trying to shim around this. Shims break abstraction boundaries and make code confusing."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/rt/dispatch.rs` around lines 157 - 159, The db_pool() accessor on DispatchRepo exposes internal SqlitePool and should be removed; instead, pass the SqlitePool or the already-initialized SwitchStore through your boot/initialization path into the module that needs it (the code currently in switch.rs). Remove the pub(crate) fn db_pool(&self) -> &sqlx::SqlitePool method from DispatchRepo, add a constructor or function parameter to accept sqlx::SqlitePool (or the concrete SwitchStore) where SwitchRepo/switch.rs is created/used, and update any callers in switch.rs to consume that injected dependency rather than calling DispatchRepo::db_pool; this keeps the repository abstraction intact and eliminates the escape hatch.src/daybook_core/sync/bootstrap.rs (1)
475-492: Consider removing the polling loop after direct document import.After the changes at lines 456-473, the core documents are directly imported via
import_doc. The subsequent polling loop (lines 475-492) that waits for documents to appear may now be redundant, asimport_docis synchronous and the documents should be immediately available.However, this could serve as a defensive fallback if there's any async persistence lag. If intentional, consider adding a brief comment explaining why the loop is retained.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/sync/bootstrap.rs` around lines 475 - 492, The polling loop that repeatedly calls big_repo.find_doc_handle and sleeps (the loop using attempts, big_repo.find_doc_handle(&bootstrap.app_doc_id)/(&bootstrap.drawer_doc_id), the debug! call, and tokio::time::sleep) is redundant because core docs are already imported synchronously via import_doc; remove this entire loop and the attempts variable so you don’t wait again after import_doc completes, and delete the associated debug/logging and sleep; if you intended to keep a defensive fallback instead, replace the loop with a concise comment adjacent to import_doc explaining why an async persistence lag might require polling and keep only an explicit, documented retry path.src/xtask/main.rs (1)
21-22: Duplicated OCI media-type constants.These constants duplicate the values defined in
daybook_core::plugs(lines 621-623). If the values need to change, they must be updated in both locations, which could lead to drift.If
xtaskintentionally avoids depending ondaybook_corefor faster builds, consider adding a comment documenting that these must stay in sync with the canonical definitions. Otherwise, importing them directly would prevent accidental divergence.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/xtask/main.rs` around lines 21 - 22, The two duplicated constants OCI_PLUG_ARTIFACT_TYPE and OCI_PLUG_MANIFEST_LAYER_MEDIA_TYPE duplicate definitions in daybook_core::plugs; either replace these local constants by importing the canonical values from daybook_core::plugs (preferred) or, if xtask must avoid the dependency, add a clear comment above the constants stating they must stay in sync with daybook_core::plugs (include the module path daybook_core::plugs and a TODO to remove duplication), so future changes don’t drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@AGENTS.md`:
- Line 137: Update the sentence "If it looks or inelegant what you're doing,
you're very liekly misalinged and wasting effort." in AGENTS.md by correcting
the two misspellings: change "liekly" to "likely" and "misalinged" to
"misaligned" (leave the rest of the sentence as-is or adjust surrounding grammar
if desired); locate the exact line containing that sentence to apply the
replacements.
In `@src/am_utils_rs/repo.rs`:
- Around line 357-376: load_live_bundle currently races: two callers can miss
live_bundles, both await load_automerge and create distinct LiveDocBundle Arcs.
Fix by re-checking the map after the await before inserting so only one bundle
is materialized: after creating bundle (in load_live_bundle) call
self.live_bundles.get(document_id).and_then(|e| e.value().upgrade()) and if it
returns Some(existing) drop the newly created bundle and return existing;
otherwise insert the new Arc::downgrade(&bundle) and return it. Alternatively
implement an entry-style placeholder in live_bundles so only the first waiter
creates the LiveDocBundle; reference load_live_bundle, live_bundles,
LiveDocBundle, and runtime.acquire_doc_lease when applying the change.
- Around line 618-652: The current with_document_with_origin calls
operation(&mut doc) directly on the locked live Automerge document before
persisting, risking memory/state drift if apply_commit_delta fails; change this
to run the operation on a temporary clone of the document (obtain the live heads
with doc.get_heads(), clone the Automerge instance, call operation(&mut
doc_clone), compute after_heads, patches (doc_clone.diff), and changes
(doc_clone.get_changes) from the clone), then call
self.repo.apply_commit_delta(...) with those changes, and only after
apply_commit_delta returns successfully, merge or replace the live doc under
self.bundle.doc.lock().await with the committed changes (e.g., apply the change
bytes or swap in the clone) so the in-memory state only advances when
persistence succeeds.
In `@src/am_utils_rs/repo/runtime.rs`:
- Around line 529-555: When get_blobs() returns None we reconstruct before_doc
from storage for diffing but never rehydrate the runtime; ensure that when
load_blobs_from_storage(...) yields Some(blobs) you call
subduction.add_sedimentree(sedimentree_id, blobs.into_iter()) (or the
appropriate add method) before calling subduction.sync_with_peer(...). Locate
the block that sets before_doc (uses get_blobs, load_blobs_from_storage,
reconstruct_automerge_from_blobs) and after successfully obtaining blobs from
storage invoke subduction.add_sedimentree(...) with the same SedimentreeId so
the local runtime holds the blobs prior to sync_with_peer; preserve error
handling and only skip add_sedimentree when blobs are truly absent.
In `@src/daybook_core/plugs.rs`:
- Around line 750-755: The get_plugs_heads function currently panics on
with_document failures; change its signature to return Res<ChangeHashSet> (or
the module's Result alias) and propagate the error instead of calling expect:
await the with_document call and use ? (or map_err to convert the error into the
Res error type) to return Err when the handle read fails, e.g.
self.app_am_handle.with_document(|am_doc|
ChangeHashSet(am_doc.get_heads().into())).await.map_err(...)?, and update
callers accordingly; apply the same pattern used in diff_events/events_for_init
to the other similar sites (the other occurrences noted near lines 1006-1008) so
all handle-read failures are returned rather than panicking.
In `@src/daybook_core/sync.rs`:
- Around line 587-630: register_incoming_peer currently only does a read-side
contains_key and then proceeds to mutate connection tables, which allows
concurrent setups for the same endpoint and never emits the public
IncomingConnection event; make the registration atomic by taking the write lock
on active_peers up-front (or use the same reservation path as
connect_endpoint_addr) to check-and-reserve the endpoint before performing async
setup, emit the IncomingConnection event to subscribers after a successful
registration, and ensure cleanup on failure (remove_peer_connection and clear
any reserved state) if full_sync_handle.set_connection or other async steps
fail; reference register_incoming_peer, active_peers,
full_sync_handle.set_connection, rcx.big_repo.ensure_peer_connection and
rcx.big_repo.remove_peer_connection when making these changes.
In `@src/daybook_core/sync/full/scheduler.rs`:
- Around line 542-554: endpoint_has_doc_work currently only checks active_docs,
queued_tasks, and pending_tasks for SyncTask::Doc and thus misses active_imports
and queued/pending SyncTask::Import; update endpoint_has_doc_work to also
consider self.active_imports (matching endpoint_id) and any queued_tasks or
pending_tasks entries that are SyncTask::Import for the same endpoint_id so
refresh_peer_fully_synced_state (and PeerFullSynced decisions) see import work;
look for the function endpoint_has_doc_work, the fields active_imports,
queued_tasks, pending_tasks, and the enum variant SyncTask::Import to add those
checks in the same style as the existing Doc checks.
In `@src/xtask/Cargo.toml`:
- Around line 38-39: The Cargo manifest lists mismatched crate versions: irpc =
"0.13.0" and irpc-iroh = "0.12.0" (irpc-iroh depends on irpc ^0.12.0), which
will pull incompatible irpc versions; fix by aligning them—either change irpc to
"0.12.0" to match irpc-iroh or bump irpc-iroh to "0.13.0" so both use the same
release line; update the entries for irpc and/or irpc-iroh in Cargo.toml
accordingly.
---
Outside diff comments:
In `@src/am_utils_rs/Cargo.toml`:
- Around line 14-19: Unconditional dependencies were added for the new sync
stack; instead mark tokio-tungstenite, sqlx, serde_json, and irpc as optional
and wire them into the existing feature flags (e.g., repo and iroh) so they are
only pulled when those features are enabled. Concretely: make each dependency
optional (optional = true) and add them to the appropriate [features] entries
(e.g., include "tokio-tungstenite", "sqlx", "serde_json", "irpc" under the
"repo" and/or "iroh" feature lists) so the crate’s default-features = false
split is preserved.
In `@src/daybook_core/sync/full.rs`:
- Around line 1489-1494: The current call to
self.scheduler.docs_to_stop.insert(doc_id.clone()) is doc-scoped and causes
batch_stop_docs() to clear every DocSyncTaskKey for that doc_id, cancelling
syncs for other endpoints; change this to insert the endpoint-scoped
DocSyncTaskKey for this particular requester (e.g., construct and insert
DocSyncTaskKey::new(doc_id.clone(), endpoint_id.clone()) or call the scheduler
API that accepts a DocSyncTaskKey) so batch_stop_docs() only clears the specific
endpoint's task rather than every task for doc_id.
- Around line 422-428: The spawned FullSyncWorker task currently swallows errors
by logging them in the block that awaits fut (see join_handle and fut), so
replace the error-handling block with a direct unwrap of fut's Result to surface
unexpected failures to Tokio; specifically, change the async closure passed to
tokio::task::spawn so it simply awaits fut and calls .unwrap() (i.e.,
fut.await.unwrap()) instead of matching Err and calling warn!, and keep the
tracing::info_span!("FullSyncWorker task") instrumentation.
---
Nitpick comments:
In `@AGENTS.md`:
- Line 133: The sentence uses the awkward phrase "to a task completion"; update
the wording to improve readability by replacing "to a task completion" with a
clearer alternative such as "to complete a task" or "until task completion" and
adjust surrounding words to keep the original meaning and tone (locate the
phrase "to a task completion" and edit that clause accordingly).
In `@src/daybook_core/rt/dispatch.rs`:
- Around line 157-159: The db_pool() accessor on DispatchRepo exposes internal
SqlitePool and should be removed; instead, pass the SqlitePool or the
already-initialized SwitchStore through your boot/initialization path into the
module that needs it (the code currently in switch.rs). Remove the pub(crate) fn
db_pool(&self) -> &sqlx::SqlitePool method from DispatchRepo, add a constructor
or function parameter to accept sqlx::SqlitePool (or the concrete SwitchStore)
where SwitchRepo/switch.rs is created/used, and update any callers in switch.rs
to consume that injected dependency rather than calling DispatchRepo::db_pool;
this keeps the repository abstraction intact and eliminates the escape hatch.
In `@src/daybook_core/sync/bootstrap.rs`:
- Around line 475-492: The polling loop that repeatedly calls
big_repo.find_doc_handle and sleeps (the loop using attempts,
big_repo.find_doc_handle(&bootstrap.app_doc_id)/(&bootstrap.drawer_doc_id), the
debug! call, and tokio::time::sleep) is redundant because core docs are already
imported synchronously via import_doc; remove this entire loop and the attempts
variable so you don’t wait again after import_doc completes, and delete the
associated debug/logging and sleep; if you intended to keep a defensive fallback
instead, replace the loop with a concise comment adjacent to import_doc
explaining why an async persistence lag might require polling and keep only an
explicit, documented retry path.
In `@src/xtask/main.rs`:
- Around line 21-22: The two duplicated constants OCI_PLUG_ARTIFACT_TYPE and
OCI_PLUG_MANIFEST_LAYER_MEDIA_TYPE duplicate definitions in daybook_core::plugs;
either replace these local constants by importing the canonical values from
daybook_core::plugs (preferred) or, if xtask must avoid the dependency, add a
clear comment above the constants stating they must stay in sync with
daybook_core::plugs (include the module path daybook_core::plugs and a TODO to
remove duplication), so future changes don’t drift.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: b2c53125-1b3f-4a00-8472-3ba8eba64ac0
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lockand included by**/*
📒 Files selected for processing (44)
AGENTS.mdCargo.tomlsrc/am_utils_rs/Cargo.tomlsrc/am_utils_rs/ids.rssrc/am_utils_rs/lib.rssrc/am_utils_rs/repo.rssrc/am_utils_rs/repo/changes.rssrc/am_utils_rs/repo/changes/broker.rssrc/am_utils_rs/repo/iroh.rssrc/am_utils_rs/repo/iroh/codec.rssrc/am_utils_rs/repo/partition.rssrc/am_utils_rs/repo/runtime.rssrc/am_utils_rs/sync.rssrc/am_utils_rs/sync/peer.rssrc/daybook_cli/Cargo.tomlsrc/daybook_core/Cargo.tomlsrc/daybook_core/app.rssrc/daybook_core/config.rssrc/daybook_core/drawer.rssrc/daybook_core/drawer/events.rssrc/daybook_core/drawer/meta.rssrc/daybook_core/drawer/mutations.rssrc/daybook_core/drawer/queries.rssrc/daybook_core/drawer/tests.rssrc/daybook_core/index/doc_blobs.rssrc/daybook_core/lib.rssrc/daybook_core/plugs.rssrc/daybook_core/repo.rssrc/daybook_core/repos.rssrc/daybook_core/rt/dispatch.rssrc/daybook_core/rt/init.rssrc/daybook_core/rt/switch.rssrc/daybook_core/sync.rssrc/daybook_core/sync/bootstrap.rssrc/daybook_core/sync/full.rssrc/daybook_core/sync/full/doc_worker.rssrc/daybook_core/sync/full/import_worker.rssrc/daybook_core/sync/full/scheduler.rssrc/daybook_core/tables.rssrc/daybook_core/test_support.rssrc/daybook_ffi/Cargo.tomlsrc/daybook_server/Cargo.tomlsrc/xtask/Cargo.tomlsrc/xtask/main.rs
💤 Files with no reviewable changes (7)
- src/daybook_server/Cargo.toml
- src/daybook_cli/Cargo.toml
- src/daybook_ffi/Cargo.toml
- src/am_utils_rs/sync.rs
- src/am_utils_rs/repo/changes/broker.rs
- src/am_utils_rs/repo/iroh/codec.rs
- src/am_utils_rs/repo/iroh.rs
| Better to have good alignment with the operator as opposed to spending a million iterations on the same thing. | ||
|
|
||
| Try to be intelligent about the user's intention. | ||
| If it looks or inelegant what you're doing, you're very liekly misalinged and wasting effort. |
There was a problem hiding this comment.
Fix spelling errors.
Two spelling errors on this line: "liekly" should be "likely" and "misalinged" should be "misaligned".
✏️ Proposed fix
-If it looks or inelegant what you're doing, you're very liekly misalinged and wasting effort.
+If it looks or inelegant what you're doing, you're very likely misaligned and wasting effort.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| If it looks or inelegant what you're doing, you're very liekly misalinged and wasting effort. | |
| If it looks or inelegant what you're doing, you're very likely misaligned and wasting effort. |
🧰 Tools
🪛 LanguageTool
[grammar] ~137-~137: Ensure spelling is correct
Context: ...nelegant what you're doing, you're very liekly misalinged and wasting effort. Make sure to get mo...
(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@AGENTS.md` at line 137, Update the sentence "If it looks or inelegant what
you're doing, you're very liekly misalinged and wasting effort." in AGENTS.md by
correcting the two misspellings: change "liekly" to "likely" and "misalinged" to
"misaligned" (leave the rest of the sentence as-is or adjust surrounding grammar
if desired); locate the exact line containing that sentence to apply the
replacements.
| async fn load_live_bundle( | ||
| self: &Arc<Self>, | ||
| document_id: &DocumentId, | ||
| ) -> Res<Option<samod::DocHandle>> { | ||
| let handle = self | ||
| .repo | ||
| .find(document_id.clone()) | ||
| .await | ||
| .map_err(|err| ferr!("failed finding doc: {err}"))?; | ||
| let Some(handle) = handle else { | ||
| ) -> Res<Option<Arc<LiveDocBundle>>> { | ||
| if let Some(existing) = self | ||
| .live_bundles | ||
| .get(document_id) | ||
| .and_then(|entry| entry.value().upgrade()) | ||
| { | ||
| return Ok(Some(existing)); | ||
| } | ||
|
|
||
| let Some(doc) = self.load_automerge(document_id).await? else { | ||
| return Ok(None); | ||
| }; | ||
| Ok(Some(handle)) | ||
| let lease = self.runtime.acquire_doc_lease(*document_id).await?; | ||
| let bundle = Arc::new(LiveDocBundle::new(*document_id, doc, lease)); | ||
| self.live_bundles | ||
| .insert(*document_id, Arc::downgrade(&bundle)); | ||
| Ok(Some(bundle)) |
There was a problem hiding this comment.
Avoid creating multiple live bundles for the same document on concurrent cold loads.
This does a cache lookup, then awaits, then inserts. Two callers can both miss live_bundles, both hydrate the same DocumentId, and both return distinct Arc<LiveDocBundle> instances. That breaks the single-mutex/live-view invariant and leaves one handle permanently stale. Re-check after the await or use an entry-style placeholder so only one bundle gets materialized.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/am_utils_rs/repo.rs` around lines 357 - 376, load_live_bundle currently
races: two callers can miss live_bundles, both await load_automerge and create
distinct LiveDocBundle Arcs. Fix by re-checking the map after the await before
inserting so only one bundle is materialized: after creating bundle (in
load_live_bundle) call self.live_bundles.get(document_id).and_then(|e|
e.value().upgrade()) and if it returns Some(existing) drop the newly created
bundle and return existing; otherwise insert the new Arc::downgrade(&bundle) and
return it. Alternatively implement an entry-style placeholder in live_bundles so
only the first waiter creates the LiveDocBundle; reference load_live_bundle,
live_bundles, LiveDocBundle, and runtime.acquire_doc_lease when applying the
change.
| pub async fn with_document_with_origin<F, R>( | ||
| &self, | ||
| operation: F, | ||
| origin: BigRepoChangeOrigin, | ||
| ) -> Res<R> | ||
| where | ||
| F: FnOnce(&mut automerge::Automerge) -> R, | ||
| { | ||
| let (before_heads, out, after_heads) = self.inner.with_document(|doc| { | ||
| let before_heads = doc.get_heads(); | ||
| let out = operation(doc); | ||
| let after_heads = doc.get_heads(); | ||
| (before_heads, out, after_heads) | ||
| }); | ||
| if before_heads != after_heads { | ||
| self.repo | ||
| .on_doc_heads_changed(self.document_id(), after_heads) | ||
| .await?; | ||
| let mut doc = self.bundle.doc.lock().await; | ||
|
|
||
| let before_heads = doc.get_heads(); | ||
| let out = operation(&mut doc); | ||
| let after_heads = doc.get_heads(); | ||
| if before_heads == after_heads { | ||
| return Ok(out); | ||
| } | ||
|
|
||
| let patches = doc.diff(&before_heads, &after_heads); | ||
| let changes = doc | ||
| .get_changes(&before_heads) | ||
| .into_iter() | ||
| .map(|change| { | ||
| let head = CommitId::new(change.hash().0); | ||
| let parents = change | ||
| .deps() | ||
| .iter() | ||
| .map(|dep| CommitId::new(dep.0)) | ||
| .collect::<BTreeSet<_>>(); | ||
| (head, parents, change.raw_bytes().to_vec()) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| self.repo | ||
| .apply_commit_delta(*self.document_id(), changes, after_heads, patches, origin) | ||
| .await?; |
There was a problem hiding this comment.
Don't mutate the live Automerge state before persistence succeeds.
operation(&mut doc) runs before apply_commit_delta(...). If the runtime/storage write fails, the handle keeps the new heads in memory while persistent state stays at the old heads. From that point on, reads and later writes are operating on a document state that was never committed.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/am_utils_rs/repo.rs` around lines 618 - 652, The current
with_document_with_origin calls operation(&mut doc) directly on the locked live
Automerge document before persisting, risking memory/state drift if
apply_commit_delta fails; change this to run the operation on a temporary clone
of the document (obtain the live heads with doc.get_heads(), clone the Automerge
instance, call operation(&mut doc_clone), compute after_heads, patches
(doc_clone.diff), and changes (doc_clone.get_changes) from the clone), then call
self.repo.apply_commit_delta(...) with those changes, and only after
apply_commit_delta returns successfully, merge or replace the live doc under
self.bundle.doc.lock().await with the committed changes (e.g., apply the change
bytes or swap in the clone) so the in-memory state only advances when
persistence succeeds.
| let out = async { | ||
| let sedimentree_id: SedimentreeId = doc_id.into(); | ||
| let before_doc = match subduction | ||
| .get_blobs(sedimentree_id) | ||
| .await | ||
| .map_err(|err| ferr!("failed reading blobs from subduction: {err}"))? | ||
| { | ||
| Some(blobs) => { | ||
| Some(reconstruct_automerge_from_blobs(blobs.into_iter())?) | ||
| } | ||
| None => { | ||
| match load_blobs_from_storage(&storage_for_reads, sedimentree_id) | ||
| .await | ||
| .map_err(|err| { | ||
| ferr!("failed reading blobs from storage: {err}") | ||
| })? { | ||
| Some(blobs) => { | ||
| Some(reconstruct_automerge_from_blobs(blobs)?) | ||
| } | ||
| None => None, | ||
| } | ||
| } | ||
| }; | ||
| let remote_peer_id: subduction_core::peer::id::PeerId = peer_id.into(); | ||
| let result = subduction | ||
| .sync_with_peer(&remote_peer_id, sedimentree_id, subscribe, timeout) | ||
| .await; |
There was a problem hiding this comment.
Hydrate storage-backed docs into subduction before syncing them.
When get_blobs() misses, this path reconstructs before_doc from storage only for diffing, then calls sync_with_peer() without first add_sedimentree(...)-ing those blobs back into the runtime. Unlike LoadDoc and AcquireDocLease in this same file, cold documents are therefore synced as if the local side were empty.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/am_utils_rs/repo/runtime.rs` around lines 529 - 555, When get_blobs()
returns None we reconstruct before_doc from storage for diffing but never
rehydrate the runtime; ensure that when load_blobs_from_storage(...) yields
Some(blobs) you call subduction.add_sedimentree(sedimentree_id,
blobs.into_iter()) (or the appropriate add method) before calling
subduction.sync_with_peer(...). Locate the block that sets before_doc (uses
get_blobs, load_blobs_from_storage, reconstruct_automerge_from_blobs) and after
successfully obtaining blobs from storage invoke subduction.add_sedimentree(...)
with the same SedimentreeId so the local runtime holds the blobs prior to
sync_with_peer; preserve error handling and only skip add_sedimentree when blobs
are truly absent.
| pub async fn get_plugs_heads(&self) -> ChangeHashSet { | ||
| self.app_am_handle | ||
| .with_document(|am_doc| ChangeHashSet(am_doc.get_heads().into())) | ||
| .await | ||
| .expect("with_document read should not fail") | ||
| } |
There was a problem hiding this comment.
Return with_document failures instead of panicking.
Line 754 turns a fallible document-handle read into a process panic, even though events_for_init already returns Res<_>. diff_events now propagates the same handle failure correctly, so this path should do the same instead of crashing on repo/doc lifecycle errors.
💡 Proposed fix
- pub async fn get_plugs_heads(&self) -> ChangeHashSet {
+ pub async fn get_plugs_heads(&self) -> Res<ChangeHashSet> {
self.app_am_handle
.with_document(|am_doc| ChangeHashSet(am_doc.get_heads().into()))
- .await
- .expect("with_document read should not fail")
+ .await
+ .wrap_err("failed reading plugs heads")
}
@@
- let heads = self.get_plugs_heads().await;
+ let heads = self.get_plugs_heads().await?;Also applies to: 1006-1008
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/daybook_core/plugs.rs` around lines 750 - 755, The get_plugs_heads
function currently panics on with_document failures; change its signature to
return Res<ChangeHashSet> (or the module's Result alias) and propagate the error
instead of calling expect: await the with_document call and use ? (or map_err to
convert the error into the Res error type) to return Err when the handle read
fails, e.g. self.app_am_handle.with_document(|am_doc|
ChangeHashSet(am_doc.get_heads().into())).await.map_err(...)?, and update
callers accordingly; apply the same pattern used in diff_events/events_for_init
to the other similar sites (the other occurrences noted near lines 1006-1008) so
all handle-read failures are returned rather than panicking.
| async fn register_incoming_peer( | ||
| &self, | ||
| endpoint_id: EndpointId, | ||
| peer_key: am_utils_rs::sync::protocol::PeerKey, | ||
| conn: am_utils_rs::repo::RepoConnection, | ||
| ) -> Res<()> { | ||
| let conn_id = conn.id; | ||
| if self.active_peers.read().await.contains_key(&endpoint_id) { | ||
| return Ok(()); | ||
| } | ||
| let partition_ids = self.peer_partition_ids(&peer_key); | ||
| self.sync_store | ||
| .allow_peer(peer_key.clone(), Some(endpoint_id)) | ||
| .await?; | ||
| let endpoint_addr = iroh::EndpointAddr::new(endpoint_id); | ||
| self.rcx | ||
| .big_repo | ||
| .ensure_peer_connection( | ||
| self.router.endpoint().clone(), | ||
| endpoint_addr.clone(), | ||
| am_utils_rs::repo::PeerId::new(*endpoint_id.as_bytes()), | ||
| ) | ||
| .await?; | ||
| if let Err(err) = self | ||
| .full_sync_handle | ||
| .set_connection(endpoint_id, conn_id, peer_key, partition_ids) | ||
| .set_connection( | ||
| endpoint_id, | ||
| endpoint_addr, | ||
| endpoint_id, | ||
| peer_key.clone(), | ||
| partition_ids, | ||
| ) | ||
| .await | ||
| { | ||
| self.clear_endpoint_if_connecting(endpoint_id).await; | ||
| self.rcx | ||
| .big_repo | ||
| .remove_peer_connection(am_utils_rs::repo::PeerId::new(*endpoint_id.as_bytes())) | ||
| .await | ||
| .ok(); | ||
| return Err(err); | ||
| } | ||
|
|
||
| let old = self | ||
| .active_samod_peers | ||
| self.active_peers | ||
| .write() | ||
| .await | ||
| .insert(endpoint_id, ActivePeerState::Connected(conn)); | ||
| assert!(matches!(old, Some(ActivePeerState::Connecting)), "fishy"); | ||
| .insert(endpoint_id, ActivePeerState::Connected { peer_key }); | ||
| Ok(()) |
There was a problem hiding this comment.
Restore inbound connection notification and make registration atomic.
connect_endpoint_addr reserves state and emits OutgoingConnection, but register_incoming_peer only does a read-side contains_key check and mutates the connection tables directly. That creates two problems: concurrent clone-provision requests for the same endpoint can both run setup, and subscribers never receive the public IncomingConnection event on the successful inbound path.
💡 Proposed fix
async fn register_incoming_peer(
&self,
endpoint_id: EndpointId,
peer_key: am_utils_rs::sync::protocol::PeerKey,
) -> Res<()> {
- if self.active_peers.read().await.contains_key(&endpoint_id) {
+ if !self.reserve_endpoint_connection(endpoint_id).await {
return Ok(());
}
let partition_ids = self.peer_partition_ids(&peer_key);
self.sync_store
.allow_peer(peer_key.clone(), Some(endpoint_id))
@@
if let Err(err) = self
.full_sync_handle
.set_connection(
endpoint_id,
endpoint_addr,
@@
self.rcx
.big_repo
.remove_peer_connection(am_utils_rs::repo::PeerId::new(*endpoint_id.as_bytes()))
.await
.ok();
+ self.clear_endpoint_if_connecting(endpoint_id).await;
return Err(err);
}
- self.active_peers
- .write()
- .await
- .insert(endpoint_id, ActivePeerState::Connected { peer_key });
+ let old = self
+ .active_peers
+ .write()
+ .await
+ .insert(endpoint_id, ActivePeerState::Connected { peer_key });
+ assert!(matches!(old, Some(ActivePeerState::Connecting)), "fishy");
+ self.registry.notify([IrohSyncEvent::IncomingConnection {
+ endpoint_id,
+ conn_id: endpoint_id,
+ peer_id: endpoint_id.to_string().into(),
+ }]);
Ok(())
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/daybook_core/sync.rs` around lines 587 - 630, register_incoming_peer
currently only does a read-side contains_key and then proceeds to mutate
connection tables, which allows concurrent setups for the same endpoint and
never emits the public IncomingConnection event; make the registration atomic by
taking the write lock on active_peers up-front (or use the same reservation path
as connect_endpoint_addr) to check-and-reserve the endpoint before performing
async setup, emit the IncomingConnection event to subscribers after a successful
registration, and ensure cleanup on failure (remove_peer_connection and clear
any reserved state) if full_sync_handle.set_connection or other async steps
fail; reference register_incoming_peer, active_peers,
full_sync_handle.set_connection, rcx.big_repo.ensure_peer_connection and
rcx.big_repo.remove_peer_connection when making these changes.
| irpc = { version = "0.13.0", features = ["derive"] } | ||
| irpc-iroh = "0.13.0" | ||
| irpc-iroh = "0.12.0" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check the actual content of src/xtask/Cargo.toml
cat -n src/xtask/Cargo.toml | sed -n '35,45p'Repository: dman-os/townframe
Length of output: 566
🌐 Web query:
irpc-iroh 0.12.0 crate dependencies
💡 Result:
The Rust crate irpc-iroh version 0.12.0 (released 2026-01-28) has the following dependencies (normal unless noted): Normal dependencies: - getrandom ^0.3 - tracing ^0.1.41 - tokio ^1.44 - serde ^1 - postcard ^1.1.1 - n0-future ^0.3 - n0-error ^0.1 - irpc ^0.12.0 - iroh-base ^0.96 - iroh ^0.96 Dev dependencies: - tracing-subscriber ^0.3.20 - rand ^0.9.2 - irpc-derive ^0.9.0 - hex ^0.4.3 - futures-util ^0.3 - clap ^4.5.41 - anyhow ^1
Citations:
- 1: https://crates.io/crates/irpc-iroh/dependencies
- 2: https://docs.rs/crate/irpc-iroh/latest
- 3: https://crates.io/crates/irpc-iroh
🌐 Web query:
irpc 0.13.0 irpc-iroh compatibility
💡 Result:
irpc 0.13.0 is fully compatible with irpc-iroh. The irpc-iroh crate version 0.13.0 explicitly depends on irpc ^0.13.0, as shown in its Cargo.toml on crates.io. Both crates were released on the same date, 2026-03-16, ensuring version alignment. irpc-iroh provides the Iroh transport implementation for irpc, enabling RPC over iroh connections, and is designed for use with matching irpc versions. Official documentation and blog posts confirm irpc is a lightweight RPC system for iroh, with irpc-iroh handling the iroh-specific transport (e.g., IrohProtocol, IrohRemoteConnection). No compatibility issues or breaking changes are noted in available sources for these versions.
Citations:
- 1: https://crates.io/crates/irpc-iroh
- 2: https://crates.io/crates/irpc
- 3: https://github.com/n0-computer/irpc
- 4: https://www.iroh.computer/blog/irpc
- 5: https://docs.iroh.computer/protocols/rpc
- 6: https://docs.rs/irpc-iroh
Keep irpc and irpc-iroh on the same release line.
irpc-iroh 0.12.0 declares a dependency on irpc ^0.12.0. Pinning irpc = 0.13.0 alongside irpc-iroh = 0.12.0 will cause Cargo to pull incompatible versions into xtask. Either downgrade irpc to 0.12.0 or upgrade irpc-iroh to 0.13.0.
🔧 If `xtask` is meant to follow the rest of this PR's transport downgrade
-irpc = { version = "0.13.0", features = ["derive"] }
-irpc-iroh = "0.12.0"
+irpc = { version = "0.12.0", features = ["derive"] }
+irpc-iroh = "0.12.0"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| irpc = { version = "0.13.0", features = ["derive"] } | |
| irpc-iroh = "0.13.0" | |
| irpc-iroh = "0.12.0" | |
| irpc = { version = "0.12.0", features = ["derive"] } | |
| irpc-iroh = "0.12.0" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/xtask/Cargo.toml` around lines 38 - 39, The Cargo manifest lists
mismatched crate versions: irpc = "0.13.0" and irpc-iroh = "0.12.0" (irpc-iroh
depends on irpc ^0.12.0), which will pull incompatible irpc versions; fix by
aligning them—either change irpc to "0.12.0" to match irpc-iroh or bump
irpc-iroh to "0.13.0" so both use the same release line; update the entries for
irpc and/or irpc-iroh in Cargo.toml accordingly.
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/am_utils_rs/partition.rs (1)
131-138:⚠️ Potential issue | 🔴 CriticalAdd a real migration for
partition_state.change_count.
CREATE TABLE IF NOT EXISTSwill not add this column for existing databases. After upgrade, the new writes inrecord_item_change_tx/record_item_deleted_txwill start failing as soon as they touchchange_count. This needs anALTER TABLE ... ADD COLUMN-style migration (guarded by schema inspection) before the new writes run.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/am_utils_rs/partition.rs` around lines 131 - 138, The CREATE TABLE IF NOT EXISTS block won't add change_count for existing DBs; add an explicit migration that checks the partition_state schema (e.g., PRAGMA table_info('partition_state') or information_schema) and, if change_count is missing, runs ALTER TABLE partition_state ADD COLUMN change_count INTEGER NOT NULL DEFAULT 0 before any code that writes the column (ensure this migration runs prior to record_item_change_tx and record_item_deleted_tx); place the migration alongside the existing table-creation logic and run it using the same state_pool execution path.
♻️ Duplicate comments (3)
src/daybook_core/plugs.rs (1)
746-750:⚠️ Potential issue | 🟠 MajorReturn
with_documentfailures fromget_plugs_heads.This helper still turns a fallible repo/doc read into a panic, even though
events_for_initalready returnsRes<_>. If the handle read fails, Line 1004 should get anErrinstead of crashing the repo path.💡 Proposed fix
- pub async fn get_plugs_heads(&self) -> ChangeHashSet { + pub async fn get_plugs_heads(&self) -> Res<ChangeHashSet> { self.app_am_handle .with_document(|am_doc| ChangeHashSet(am_doc.get_heads().into())) .await - .expect("with_document read should not fail") + .wrap_err("failed reading plugs heads") } @@ - let heads = self.get_plugs_heads().await; + let heads = self.get_plugs_heads().await?;Based on learnings: Handle errors that are downstream of network errors or user input validation; use type state elimination to prevent error paths.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/plugs.rs` around lines 746 - 750, get_plugs_heads currently panics on a failed with_document read; change its signature to return the repo error type used by events_for_init (e.g., Res<ChangeHashSet>) and propagate the failure instead of using expect. Replace the expect call by awaiting with_document and using ? (or map_err) to convert/return the error, then wrap the successful value into ChangeHashSet (i.e., self.app_am_handle.with_document(|am_doc| ChangeHashSet(am_doc.get_heads().into())).await? ). Ensure callers (like events_for_init) receive the propagated Err rather than causing a panic.src/daybook_core/sync.rs (1)
607-647:⚠️ Potential issue | 🟠 MajorMake inbound registration atomic and emit the public event.
The read-side
contains_keycheck at Line 612 still leaves a TOCTOU window:incoming_peer_handleandhandle_clone_provision_requestcan race and both run setup for the same endpoint. The success path also still never publishesIrohSyncEvent::IncomingConnection, so subscribers miss inbound connects entirely. Reusereserve_endpoint_connection/clear_endpoint_if_connectinghere and notify afterset_connectionsucceeds.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/sync.rs` around lines 607 - 647, The inbound registration in register_incoming_peer has a TOCTOU race and never emits the public IrohSyncEvent::IncomingConnection; fix it by replacing the preliminary active_peers.read().contains_key(&endpoint_id) check with a call to reserve_endpoint_connection(endpoint_id) to atomically reserve the slot, ensure you call clear_endpoint_if_connecting(endpoint_id) on any error path (including after failed set_connection), and after a successful full_sync_handle.set_connection(...) insert ActivePeerState::Connected into active_peers and publish IrohSyncEvent::IncomingConnection to notify subscribers; keep peer_partition_ids, sync_store.allow_peer, and the connect_with_peer call as-is but ensure reservation/clear and event emission surround set_connection.src/am_utils_rs/repo/runtime.rs (1)
1321-1344:⚠️ Potential issue | 🔴 CriticalCold docs still sync without being hydrated into subduction.
sync_with_peer()is called directly from the doc worker, but this path never loads a storage-backed doc into the runtime first. If the doc only exists on disk, the local side can still sync as if it were empty/missing. The earlier cold-doc hydration bug is still present here.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/am_utils_rs/repo/runtime.rs` around lines 1321 - 1344, handle_sync_with_peer currently calls subduction.sync_with_peer without first ensuring the storage-backed document is loaded into the runtime, so cold (on-disk) docs can appear missing; before invoking self.subduction.sync_with_peer in handle_sync_with_peer, load/hydrate the document into the runtime (e.g., call the existing routine that hydrates or loads storage-backed docs for this worker — reference handle_sync_with_peer and sync_with_peer) and bail with an appropriate error if hydration fails, then proceed to call sync_with_peer only after successful hydration.
🧹 Nitpick comments (1)
src/daybook_core/lib.rs (1)
101-109: Optional: simplify the digest-to-bytes copy.
Sha256::finalize()returns aGenericArray<u8, U32>, which is already 32 bytes, so the explicit&digest[..32]slice is redundant. You can simplify todigest.into():♻️ Proposed simplification
pub(crate) fn peer_id_from_label(label: &str) -> am_utils_rs::repo::PeerId { use sha2::Digest; let mut hasher = sha2::Sha256::new(); hasher.update(label.as_bytes()); let digest = hasher.finalize(); - let mut bytes = [0_u8; 32]; - bytes.copy_from_slice(&digest[..32]); - am_utils_rs::repo::PeerId::new(bytes) + am_utils_rs::repo::PeerId::new(digest.into()) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/daybook_core/lib.rs` around lines 101 - 109, The function peer_id_from_label currently copies the Sha256 digest into a 32-byte array via copy_from_slice; simplify by converting the GenericArray returned by Sha256::finalize() directly into a [u8; 32] (e.g., let bytes: [u8; 32] = digest.into();) and pass that to am_utils_rs::repo::PeerId::new(bytes) to remove the redundant slice/copy.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/am_utils_rs/ids.rs`:
- Around line 143-147: Replace the infallible impl From<PeerId32> for
iroh::PublicKey (which calls iroh::PublicKey::from_bytes(...).expect(...)) with
a TryFrom<PeerId32> implementation that returns Result<iroh::PublicKey, _>,
calling iroh::PublicKey::from_bytes(&value.0) and propagating the validation
error instead of panicking; update callers (if any) to handle the Result and
reference the types PeerId32, iroh::PublicKey, and iroh::PublicKey::from_bytes
when making the change.
In `@src/am_utils_rs/repo/runtime.rs`:
- Around line 1089-1098: Replace the fallible propagation in the
DocWorkerMsg::ApplySyncSession branch so that errors from
handle_apply_sync_session(session) are handled locally instead of using `?`:
call handle_apply_sync_session(session).await, match its Result, and on Err(e)
log the error (e.g., tracing::warn/error) and treat it as a recoverable sync
failure by sending the non-fatal RuntimeEvt::DocWorkerTransientFinished (via
send_runtime_evt & self.runtime_evt_tx, &self.shutdown,
RuntimeEvt::DocWorkerTransientFinished { doc_id: self.doc_id }) and return
Ok(()) rather than propagating to cause a FatalWorkerError; on Ok(()) proceed to
send the normal DocWorkerTransientFinished as before.
In `@src/am_utils_rs/sync/peer.rs`:
- Around line 203-206: When handling the branch for recv =
worker.doc_ack_rx.recv() in the peer sync loop, do not treat a None (closed
channel) as a clean break; replace the current break with returning an Err that
triggers the AbnormalExit path so the abnormal worker exit is signaled.
Concretely, in the block where let Some(ack) = recv else { ... } remove the
break and return an Err that maps to the AbnormalExit event (e.g.,
construct/propagate the existing AbnormalExit error variant or convert to the
function's error type) so abnormal shutdown order and reporting are respected.
In `@src/daybook_core/config.rs`:
- Around line 384-397: The call to automerge::diff_obj inside diff_events
currently uses expect("diff_obj failed") which panics on failure; change this to
propagate the error through the function's Res return type instead. Inside the
with_document closure (involving self.app_am_handle.with_document and variables
from/to/heads), replace the expect with converting the diff_obj Error into the
crate's error (or use ? if From is implemented) so you return Err(...) from
diff_events rather than panicking; ensure the conversion matches the project's
error type used by Res<Vec<ConfigEvent>>.
In `@src/daybook_core/sync.rs`:
- Around line 40-43: accepting a peer currently calls
self.incoming_peer_tx.try_send(conn.peer_id()) and silently drops the peer on
Full/Closed; instead, ensure the peer is never lost by handling both error
cases: if try_send returns Full, await to send using the async send (e.g.,
self.incoming_peer_tx.send(conn.peer_id()).await) or loop with backpressure
until it succeeds; if the channel is Closed, propagate an error or run the
shutdown/cleanup path (close the newly accepted connection and return an error)
so the caller knows the consumer task is gone. Update the code around
accept_peer_connection/try_send (symbols: accept_peer_connection,
incoming_peer_tx, try_send, send) to await/send on Full and to handle Closed by
returning/cleaning up rather than dropping the peer.
In `@src/daybook_core/sync/bootstrap.rs`:
- Around line 446-475: The RPC response handling for full_docs must validate
that the returned set contains exactly the two expected core docs (app_doc_id
and drawer_doc_id) and fail fast on any mismatch instead of proceeding to
import; after receiving full_docs (the result of repo_rpc.rpc for
GetDocsFullRpcReq) compute the set of returned doc IDs from full_docs.docs,
compare it to the expected set {app_doc_id, drawer_doc_id}, and return an error
(wrap_err/eyre) if any expected ID is missing or any unexpected ID is present
before iterating and calling big_repo.put_doc; keep existing
parsing/loading/import logic (DocumentId::from_str, Automerge::load,
big_repo.get_doc, big_repo.put_doc) but only run it when the ID set matches
exactly.
In `@src/daybook_core/sync/tests/ladder.rs`:
- Around line 221-225: The test sets the process-global env var directly inside
the async test (in iroh_sync_two_nodes_can_connect), which can race with other
tests; extract this into a shared Once guard like the stress test does: add a
static TEST_ENV_INIT: std::sync::Once = std::sync::Once::new(); and replace
std::env::set_var("DAYB_DISABLE_KEYRING", "1"); with TEST_ENV_INIT.call_once(||
{ std::env::set_var("DAYB_DISABLE_KEYRING", "1"); }); so the env var is only set
once safely across concurrent tests.
---
Outside diff comments:
In `@src/am_utils_rs/partition.rs`:
- Around line 131-138: The CREATE TABLE IF NOT EXISTS block won't add
change_count for existing DBs; add an explicit migration that checks the
partition_state schema (e.g., PRAGMA table_info('partition_state') or
information_schema) and, if change_count is missing, runs ALTER TABLE
partition_state ADD COLUMN change_count INTEGER NOT NULL DEFAULT 0 before any
code that writes the column (ensure this migration runs prior to
record_item_change_tx and record_item_deleted_tx); place the migration alongside
the existing table-creation logic and run it using the same state_pool execution
path.
---
Duplicate comments:
In `@src/am_utils_rs/repo/runtime.rs`:
- Around line 1321-1344: handle_sync_with_peer currently calls
subduction.sync_with_peer without first ensuring the storage-backed document is
loaded into the runtime, so cold (on-disk) docs can appear missing; before
invoking self.subduction.sync_with_peer in handle_sync_with_peer, load/hydrate
the document into the runtime (e.g., call the existing routine that hydrates or
loads storage-backed docs for this worker — reference handle_sync_with_peer and
sync_with_peer) and bail with an appropriate error if hydration fails, then
proceed to call sync_with_peer only after successful hydration.
In `@src/daybook_core/plugs.rs`:
- Around line 746-750: get_plugs_heads currently panics on a failed
with_document read; change its signature to return the repo error type used by
events_for_init (e.g., Res<ChangeHashSet>) and propagate the failure instead of
using expect. Replace the expect call by awaiting with_document and using ? (or
map_err) to convert/return the error, then wrap the successful value into
ChangeHashSet (i.e., self.app_am_handle.with_document(|am_doc|
ChangeHashSet(am_doc.get_heads().into())).await? ). Ensure callers (like
events_for_init) receive the propagated Err rather than causing a panic.
In `@src/daybook_core/sync.rs`:
- Around line 607-647: The inbound registration in register_incoming_peer has a
TOCTOU race and never emits the public IrohSyncEvent::IncomingConnection; fix it
by replacing the preliminary active_peers.read().contains_key(&endpoint_id)
check with a call to reserve_endpoint_connection(endpoint_id) to atomically
reserve the slot, ensure you call clear_endpoint_if_connecting(endpoint_id) on
any error path (including after failed set_connection), and after a successful
full_sync_handle.set_connection(...) insert ActivePeerState::Connected into
active_peers and publish IrohSyncEvent::IncomingConnection to notify
subscribers; keep peer_partition_ids, sync_store.allow_peer, and the
connect_with_peer call as-is but ensure reservation/clear and event emission
surround set_connection.
---
Nitpick comments:
In `@src/daybook_core/lib.rs`:
- Around line 101-109: The function peer_id_from_label currently copies the
Sha256 digest into a 32-byte array via copy_from_slice; simplify by converting
the GenericArray returned by Sha256::finalize() directly into a [u8; 32] (e.g.,
let bytes: [u8; 32] = digest.into();) and pass that to
am_utils_rs::repo::PeerId::new(bytes) to remove the redundant slice/copy.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 29b78976-edb7-45ef-bfc9-cbf16b89ab7b
⛔ Files ignored due to path filters (3)
Cargo.lockis excluded by!**/*.lockand included by**/*docs/DEVDOC/todo.mdis excluded by!docs/DEVDOC/**and included by**/*flake.lockis excluded by!**/*.lockand included by**/*
📒 Files selected for processing (30)
Cargo.tomlsrc/am_utils_rs/Cargo.tomlsrc/am_utils_rs/ids.rssrc/am_utils_rs/lib.rssrc/am_utils_rs/partition.rssrc/am_utils_rs/repo.rssrc/am_utils_rs/repo/changes.rssrc/am_utils_rs/repo/partition.rssrc/am_utils_rs/repo/runtime.rssrc/am_utils_rs/sync/peer.rssrc/daybook_core/app.rssrc/daybook_core/config.rssrc/daybook_core/drawer.rssrc/daybook_core/drawer/mutations.rssrc/daybook_core/index/doc_blobs.rssrc/daybook_core/lib.rssrc/daybook_core/plugs.rssrc/daybook_core/rt.rssrc/daybook_core/rt/dispatch.rssrc/daybook_core/rt/init.rssrc/daybook_core/sync.rssrc/daybook_core/sync/TODO.mdsrc/daybook_core/sync/bootstrap.rssrc/daybook_core/sync/full.rssrc/daybook_core/sync/full/doc_worker.rssrc/daybook_core/sync/full/import_worker.rssrc/daybook_core/sync/full/scheduler.rssrc/daybook_core/sync/tests/ladder.rssrc/daybook_core/tables.rssrc/daybook_core/test_support.rs
💤 Files with no reviewable changes (1)
- src/am_utils_rs/repo/partition.rs
🚧 Files skipped from review as they are similar to previous changes (5)
- src/daybook_core/rt/dispatch.rs
- src/daybook_core/test_support.rs
- src/am_utils_rs/Cargo.toml
- Cargo.toml
- src/daybook_core/app.rs
| #[cfg(feature = "iroh")] | ||
| impl From<PeerId32> for iroh::PublicKey { | ||
| fn from(value: PeerId32) -> Self { | ||
| iroh::PublicKey::from_bytes(&value.0).expect("PeerId32 must be 32 bytes") | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In iroh 0.96.1, does iroh::PublicKey::from_bytes validate the 32-byte input, or is every 32-byte value accepted?
💡 Result:
In iroh 0.96.1, iroh::PublicKey::from_bytes validates the 32-byte input. It does not accept every 32-byte value. The function calls ed25519_dalek::VerifyingKey::from_bytes(bytes)?, which returns an error if the bytes do not represent a valid Ed25519 curve point. It returns Result<Self, KeyParsingError>, failing on invalid inputs. This behavior is documented in the source code and docs.rs for iroh-base (used by iroh), consistent across versions including 0.96.1.
Citations:
- 1: https://github.com/n0-computer/iroh/blob/main/iroh-base/src/key.rs
- 2: https://docs.rs/iroh/latest/iroh/struct.PublicKey.html
- 3: https://crates.io/crates/iroh-base/0.96.1
- 4: https://docs.rs/iroh-base/^0.96/
- 5: https://docs.rs/iroh-base/latest/iroh_base/struct.PublicKey.html
Use TryFrom<PeerId32> for iroh::PublicKey conversion to avoid panics on invalid Ed25519 keys.
PeerId32::new and FromStr accept any 32-byte value, but iroh::PublicKey::from_bytes validates that bytes form a valid Ed25519 curve point via ed25519_dalek. This impl's .expect() makes an otherwise valid PeerId32 panic if it doesn't represent a valid Ed25519 key—a public API contract mismatch. Prefer TryFrom to surface validation failures explicitly.
Suggested fix
-impl From<PeerId32> for iroh::PublicKey {
- fn from(value: PeerId32) -> Self {
- iroh::PublicKey::from_bytes(&value.0).expect("PeerId32 must be 32 bytes")
- }
-}
+impl TryFrom<PeerId32> for iroh::PublicKey {
+ type Error = iroh::key::KeyParsingError;
+
+ fn try_from(value: PeerId32) -> Result<Self, Self::Error> {
+ iroh::PublicKey::from_bytes(&value.into_bytes())
+ }
+}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/am_utils_rs/ids.rs` around lines 143 - 147, Replace the infallible impl
From<PeerId32> for iroh::PublicKey (which calls
iroh::PublicKey::from_bytes(...).expect(...)) with a TryFrom<PeerId32>
implementation that returns Result<iroh::PublicKey, _>, calling
iroh::PublicKey::from_bytes(&value.0) and propagating the validation error
instead of panicking; update callers (if any) to handle the Result and reference
the types PeerId32, iroh::PublicKey, and iroh::PublicKey::from_bytes when making
the change.
| DocWorkerMsg::ApplySyncSession { session } => { | ||
| self.handle_apply_sync_session(session).await?; | ||
| send_runtime_evt( | ||
| &self.runtime_evt_tx, | ||
| &self.shutdown, | ||
| RuntimeEvt::DocWorkerTransientFinished { | ||
| doc_id: self.doc_id, | ||
| }, | ||
| ); | ||
| } |
There was a problem hiding this comment.
Don't crash the runtime on a bad sync session.
This branch propagates handle_apply_sync_session() errors out of the doc worker. Those errors come from remote/storage blobs, but they currently escalate into RuntimeEvt::FatalWorkerError and panic the whole runtime. That should stay a recoverable sync failure, not a process-killing path.
Suggested fix
DocWorkerMsg::ApplySyncSession { session } => {
- self.handle_apply_sync_session(session).await?;
+ if let Err(err) = self.handle_apply_sync_session(session).await {
+ warn!(doc_id = %self.doc_id, ?err, "dropping invalid sync session");
+ }
send_runtime_evt(
&self.runtime_evt_tx,
&self.shutdown,
RuntimeEvt::DocWorkerTransientFinished {
doc_id: self.doc_id,Based on learnings: Data structures in automerge documents and related metadata are external input that can have version drift across devices, so validation with eyre::bail!() is appropriate rather than panicking.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| DocWorkerMsg::ApplySyncSession { session } => { | |
| self.handle_apply_sync_session(session).await?; | |
| send_runtime_evt( | |
| &self.runtime_evt_tx, | |
| &self.shutdown, | |
| RuntimeEvt::DocWorkerTransientFinished { | |
| doc_id: self.doc_id, | |
| }, | |
| ); | |
| } | |
| DocWorkerMsg::ApplySyncSession { session } => { | |
| if let Err(err) = self.handle_apply_sync_session(session).await { | |
| warn!(doc_id = %self.doc_id, ?err, "dropping invalid sync session"); | |
| } | |
| send_runtime_evt( | |
| &self.runtime_evt_tx, | |
| &self.shutdown, | |
| RuntimeEvt::DocWorkerTransientFinished { | |
| doc_id: self.doc_id, | |
| }, | |
| ); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/am_utils_rs/repo/runtime.rs` around lines 1089 - 1098, Replace the
fallible propagation in the DocWorkerMsg::ApplySyncSession branch so that errors
from handle_apply_sync_session(session) are handled locally instead of using
`?`: call handle_apply_sync_session(session).await, match its Result, and on
Err(e) log the error (e.g., tracing::warn/error) and treat it as a recoverable
sync failure by sending the non-fatal RuntimeEvt::DocWorkerTransientFinished
(via send_runtime_evt & self.runtime_evt_tx, &self.shutdown,
RuntimeEvt::DocWorkerTransientFinished { doc_id: self.doc_id }) and return
Ok(()) rather than propagating to cause a FatalWorkerError; on Ok(()) proceed to
send the normal DocWorkerTransientFinished as before.
| recv = worker.doc_ack_rx.recv() => { | ||
| let Some(ack) = recv else { | ||
| debug!("samod ack channel closed; stopping peer sync worker"); | ||
| debug!("doc ack channel closed; stopping peer sync worker"); | ||
| break; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
wc -l src/am_utils_rs/sync/peer.rsRepository: dman-os/townframe
Length of output: 94
🏁 Script executed:
# Let's look at the specific lines mentioned in the review comment
sed -n '195,215p' src/am_utils_rs/sync/peer.rsRepository: dman-os/townframe
Length of output: 1041
🏁 Script executed:
# Let's see more context around the worker loop to understand the structure
sed -n '150,250p' src/am_utils_rs/sync/peer.rsRepository: dman-os/townframe
Length of output: 4059
🏁 Script executed:
# Let's check what happens with run_res and where AbnormalExit is emitted
rg -A 5 -B 5 "AbnormalExit" src/am_utils_rs/sync/peer.rsRepository: dman-os/townframe
Length of output: 641
🏁 Script executed:
# Let's look for the spawn wrapper and error handling
rg -A 10 "tokio::spawn" src/am_utils_rs/sync/peer.rs | head -40Repository: dman-os/townframe
Length of output: 609
🏁 Script executed:
# Check how doc_ack_rx is initialized and from where
rg -B 10 "doc_ack_rx" src/am_utils_rs/sync/peer.rs | head -50Repository: dman-os/townframe
Length of output: 1698
🏁 Script executed:
# Find where doc_ack_tx (the sender) is created
rg -B 5 -A 5 "doc_ack_tx\|DocSyncAck" src/am_utils_rs/sync/peer.rsRepository: dman-os/townframe
Length of output: 43
🏁 Script executed:
# Search more broadly for doc_ack channel setup
rg -B 3 -A 3 "mpsc::channel.*doc_ack\|doc_ack.*mpsc::channel" src/Repository: dman-os/townframe
Length of output: 43
🏁 Script executed:
# Check who calls spawn_peer_sync_worker and how they handle shutdown
rg -B 10 -A 10 "spawn_peer_sync_worker" src/ --type rsRepository: dman-os/townframe
Length of output: 88
🏁 Script executed:
# Look for any doc sync worker or doc_sync_tx initialization patterns
rg -B 5 -A 5 "doc_sync_tx\|doc_ack_rx" src/ --type rs | head -80Repository: dman-os/townframe
Length of output: 88
🏁 Script executed:
# Search for doc_ack across all files without file type restriction
rg "doc_ack" src/Repository: dman-os/townframe
Length of output: 1130
🏁 Script executed:
# Search for spawn_peer_sync_worker calls
rg "spawn_peer_sync_worker"Repository: dman-os/townframe
Length of output: 243
🏁 Script executed:
# Let's check the entire sync module structure
find src/am_utils_rs/sync -name "*.rs" -type fRepository: dman-os/townframe
Length of output: 182
🏁 Script executed:
# Look at the mod.rs or lib to understand how channels are created
cat -n src/am_utils_rs/sync/mod.rs | head -100Repository: dman-os/townframe
Length of output: 121
🏁 Script executed:
# Look at the context in full.rs where spawn_peer_sync_worker is called
rg -B 20 -A 20 "spawn_peer_sync_worker" src/daybook_core/sync/full.rsRepository: dman-os/townframe
Length of output: 1805
🏁 Script executed:
# Check the structure and shutdown of full.rs sync session
sed -n '1,100p' src/daybook_core/sync/full.rsRepository: dman-os/townframe
Length of output: 2658
🏁 Script executed:
# Look for where doc_ack_tx is used/dropped in full.rs
rg -B 3 -A 3 "doc_ack_tx" src/daybook_core/sync/full.rsRepository: dman-os/townframe
Length of output: 1373
🏁 Script executed:
# Check if there's any shutdown or drop logic that closes channels
rg -A 20 "impl.*Drop\|fn drop\|shutdown" src/daybook_core/sync/full.rs | head -80Repository: dman-os/townframe
Length of output: 43
🏁 Script executed:
# Check if there's explicit logic that closes doc_ack_tx before the worker stops
rg -B 5 -A 5 "session.*drop\|drop.*session\|clear_peer" src/daybook_core/sync/full.rs | head -60Repository: dman-os/townframe
Length of output: 43
🏁 Script executed:
# Look at PeerPartitionSession drop/cleanup logic
rg -B 10 -A 10 "impl.*Drop.*PeerPartitionSession\|PeerPartitionSession.*drop" src/daybook_core/sync/full.rsRepository: dman-os/townframe
Length of output: 43
🏁 Script executed:
# Check the exact shutdown sequence in the main loop
rg -B 15 "session.stop.stop" src/daybook_core/sync/full.rsRepository: dman-os/townframe
Length of output: 1932
🏁 Script executed:
# Look at what session.stop.stop() does and how it signals the worker
rg -B 5 -A 15 "pub.*fn stop\|async.*fn stop" src/am_utils_rs/sync/peer.rs | head -80Repository: dman-os/townframe
Length of output: 43
🏁 Script executed:
# Check PeerSyncWorkerStopToken implementation
rg -B 5 -A 10 "PeerSyncWorkerStopToken" src/am_utils_rs/sync/peer.rsRepository: dman-os/townframe
Length of output: 1781
Treat unexpected doc_ack_rx closure as an abnormal worker exit.
When the doc-sync worker dies unexpectedly or the sender side drops without proper shutdown signaling, doc_ack_rx.recv() returns None and the worker breaks, returning Ok(()). This bypasses the AbnormalExit event entirely. Instead of breaking, return an Err so that abnormal termination is properly signaled. Per coding guidelines, respect shutdown order—unexpected channel closure should not be silently treated as a clean shutdown.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/am_utils_rs/sync/peer.rs` around lines 203 - 206, When handling the
branch for recv = worker.doc_ack_rx.recv() in the peer sync loop, do not treat a
None (closed channel) as a clean break; replace the current break with returning
an Err that triggers the AbnormalExit path so the abnormal worker exit is
signaled. Concretely, in the block where let Some(ack) = recv else { ... }
remove the break and return an Err that maps to the AbnormalExit event (e.g.,
construct/propagate the existing AbnormalExit error variant or convert to the
function's error type) so abnormal shutdown order and reporting are respected.
| let (patches, heads) = self | ||
| .app_am_handle | ||
| .with_document(|am_doc| { | ||
| let heads = if let Some(ref to_set) = to { | ||
| to_set.clone() | ||
| } else { | ||
| ChangeHashSet(am_doc.get_heads().into()) | ||
| }; | ||
| let patches = am_doc | ||
| .diff_obj(&automerge::ROOT, &from, &heads, true) | ||
| .expect("diff_obj failed"); | ||
| (patches, heads) | ||
| }) | ||
| .await?; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n src/daybook_core/config.rs | sed -n '370,410p'Repository: dman-os/townframe
Length of output: 1728
🏁 Script executed:
# Get the function signature and broader context
rg -A 30 "fn diff_events" src/daybook_core/config.rsRepository: dman-os/townframe
Length of output: 1150
🏁 Script executed:
# Check what error handling is used elsewhere with with_document
rg -B 5 -A 10 "with_document" src/daybook_core/config.rs | head -60Repository: dman-os/townframe
Length of output: 1277
🏁 Script executed:
rg -B 3 -A 8 "wrap_err\|bail!" src/daybook_core/config.rs | head -80Repository: dman-os/townframe
Length of output: 43
🏁 Script executed:
# Check how errors are propagated in similar with_document closures
rg -B 2 -A 15 "\.with_document.*\{" src/daybook_core/config.rs | grep -A 12 "with_document"Repository: dman-os/townframe
Length of output: 550
🏁 Script executed:
# Check the drawer.rs file mentioned in the learning for examples of proper error handling
rg -B 3 -A 8 "bail!\|wrap_err" src/daybook_core/drawer.rs | head -100Repository: dman-os/townframe
Length of output: 43
🏁 Script executed:
# Look at the actual implementation of with_document to understand what it returns
fd -t f "\.rs$" -x grep -l "fn with_document" {} \;Repository: dman-os/townframe
Length of output: 87
🏁 Script executed:
# Find the with_document implementation
rg -B 5 -A 20 "fn with_document" src/am_utils_rs/repo.rsRepository: dman-os/townframe
Length of output: 3062
🏁 Script executed:
# Look for error handling patterns with diff_obj in drawer.rs (mentioned in learning)
rg -B 5 -A 10 "diff_obj" src/daybook_core/drawer.rs | head -80Repository: dman-os/townframe
Length of output: 43
🏁 Script executed:
# Check how diff_obj errors are handled elsewhere in the codebase
rg "diff_obj" -B 3 -A 3 src/daybook_core/Repository: dman-os/townframe
Length of output: 2742
🏁 Script executed:
# Look at the actual return type of diff_obj
rg -B 2 -A 10 "fn diff_obj"Repository: dman-os/townframe
Length of output: 43
Fix panic on diff_obj in diff_events—return error instead.
This function already returns Res<Vec<ConfigEvent>>, enabling proper error propagation. Line 394 converts an Automerge diff failure into a panic; remote drift or invalid replay heads should surface as Err so callers can handle it.
The pattern is already applied correctly in tables.rs, plugs.rs, and drawer/events.rs:
Proposed fix
let (patches, heads) = self
.app_am_handle
.with_document(|am_doc| {
let heads = if let Some(ref to_set) = to {
to_set.clone()
} else {
ChangeHashSet(am_doc.get_heads().into())
};
let patches = am_doc
.diff_obj(&automerge::ROOT, &from, &heads, true)
- .expect("diff_obj failed");
- (patches, heads)
+ .wrap_err("diff_obj failed")?;
+ eyre::Ok((patches, heads))
})
- .await?;
+ .await??;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/daybook_core/config.rs` around lines 384 - 397, The call to
automerge::diff_obj inside diff_events currently uses expect("diff_obj failed")
which panics on failure; change this to propagate the error through the
function's Res return type instead. Inside the with_document closure (involving
self.app_am_handle.with_document and variables from/to/heads), replace the
expect with converting the diff_obj Error into the crate's error (or use ? if
From is implemented) so you return Err(...) from diff_events rather than
panicking; ensure the conversion matches the project's error type used by
Res<Vec<ConfigEvent>>.
| self.incoming_peer_tx | ||
| .try_send(conn.peer_id()) | ||
| .inspect_err(|err| warn!(?err, "incoming peer channel full; dropping notification")) | ||
| .ok(); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find and examine the channel definition and its type
rg -A 5 'incoming_peer_tx' src/daybook_core/sync.rs | head -40Repository: dman-os/townframe
Length of output: 996
🏁 Script executed:
# Search for the type definition of incoming_peer_tx
rg 'incoming_peer_tx\s*:' src/daybook_core/sync.rsRepository: dman-os/townframe
Length of output: 137
🏁 Script executed:
# Check the register_incoming_peer implementation to understand the flow
sed -n '607,647p' src/daybook_core/sync.rsRepository: dman-os/townframe
Length of output: 1386
🏁 Script executed:
# Look for the SubductionProtocolHandler struct definition and initialization
ast-grep --pattern 'struct SubductionProtocolHandler {
$$$
}'Repository: dman-os/townframe
Length of output: 333
🏁 Script executed:
# Check how incoming_peer_tx is initialized to understand its type
rg -B 10 'incoming_peer_tx\s*=' src/daybook_core/sync.rs | head -50Repository: dman-os/townframe
Length of output: 43
🏁 Script executed:
# Check if there's a task consuming from incoming_peer_rx
rg -n 'incoming_peer_rx' src/daybook_core/sync.rsRepository: dman-os/townframe
Length of output: 231
🏁 Script executed:
# Search for where incoming_peer_rx is used/spawned
rg -A 20 'mut incoming_peer_rx' src/daybook_core/sync.rsRepository: dman-os/townframe
Length of output: 926
🏁 Script executed:
# Verify the full initialization context to understand shutdown order
sed -n '1,100p' src/daybook_core/sync.rsRepository: dman-os/townframe
Length of output: 3026
Never silently drop accepted peers when the inbound channel is full.
accept_peer_connection(...) succeeds here, but if try_send() fails with Full or Closed, the peer is silently discarded and never reaches the consumer task at line 285 that processes incoming peers. With a bounded channel (capacity 128), overflow is realistic under load. The warning log documents the issue but doesn't prevent data loss.
Per coding guidelines: "Never ignore channel send errors in Rust; respect shutdown order to ensure channels remain open." This is a regular mpsc channel, not a broadcast, so the error exception does not apply.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/daybook_core/sync.rs` around lines 40 - 43, accepting a peer currently
calls self.incoming_peer_tx.try_send(conn.peer_id()) and silently drops the peer
on Full/Closed; instead, ensure the peer is never lost by handling both error
cases: if try_send returns Full, await to send using the async send (e.g.,
self.incoming_peer_tx.send(conn.peer_id()).await) or loop with backpressure
until it succeeds; if the channel is Closed, propagate an error or run the
shutdown/cleanup path (close the newly accepted connection and return an error)
so the caller knows the consumer task is gone. Update the code around
accept_peer_connection/try_send (symbols: accept_peer_connection,
incoming_peer_tx, try_send, send) to await/send on Full and to handle Closed by
returning/cleaning up rather than dropping the peer.
| let full_docs = repo_rpc | ||
| .rpc(am_utils_rs::repo::rpc::GetDocsFullRpcReq { | ||
| peer: local_peer_key.to_string(), | ||
| req: am_utils_rs::repo::rpc::GetDocsFullRequest { | ||
| doc_ids: vec![app_doc_id.clone(), drawer_doc_id.clone()], | ||
| }, | ||
| }) | ||
| .await | ||
| .wrap_err("GetDocsFull rpc failed during clone bootstrap")? | ||
| .map_err(|err| eyre::eyre!("GetDocsFull rejected during clone bootstrap: {err:?}"))?; | ||
|
|
||
| for full_doc in full_docs.docs { | ||
| let parsed = DocumentId::from_str(&full_doc.doc_id).wrap_err_with(|| { | ||
| format!( | ||
| "invalid core doc id in bootstrap payload: {}", | ||
| full_doc.doc_id | ||
| ) | ||
| })?; | ||
| if big_repo.get_doc(&parsed).await?.is_some() { | ||
| continue; | ||
| } | ||
| let loaded = automerge::Automerge::load(&full_doc.automerge_save).map_err(|err| { | ||
| eyre::eyre!( | ||
| "invalid automerge payload for core doc {} during clone bootstrap: {err}", | ||
| full_doc.doc_id | ||
| ) | ||
| })?; | ||
| big_repo.put_doc(parsed, loaded).await?; | ||
| } | ||
|
|
There was a problem hiding this comment.
Fail fast if GetDocsFull does not return both required core docs.
This path is the only place here that imports bootstrap.app_doc_id / bootstrap.drawer_doc_id into big_repo, so a partial RPC reply can only degrade into the generic timeout below. It also silently accepts unexpected extra docs. Validate the returned ID set before importing and error immediately on mismatch.
Suggested guard
let full_docs = repo_rpc
.rpc(am_utils_rs::repo::rpc::GetDocsFullRpcReq {
peer: local_peer_key.to_string(),
req: am_utils_rs::repo::rpc::GetDocsFullRequest {
doc_ids: vec![app_doc_id.clone(), drawer_doc_id.clone()],
},
})
.await
.wrap_err("GetDocsFull rpc failed during clone bootstrap")?
.map_err(|err| eyre::eyre!("GetDocsFull rejected during clone bootstrap: {err:?}"))?;
+
+ let expected: HashSet<String> =
+ [app_doc_id.clone(), drawer_doc_id.clone()].into_iter().collect();
+ let returned: HashSet<String> =
+ full_docs.docs.iter().map(|doc| doc.doc_id.clone()).collect();
+ if returned != expected {
+ eyre::bail!(
+ "clone bootstrap returned wrong core docs: expected={expected:?} returned={returned:?}"
+ );
+ }
for full_doc in full_docs.docs {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/daybook_core/sync/bootstrap.rs` around lines 446 - 475, The RPC response
handling for full_docs must validate that the returned set contains exactly the
two expected core docs (app_doc_id and drawer_doc_id) and fail fast on any
mismatch instead of proceeding to import; after receiving full_docs (the result
of repo_rpc.rpc for GetDocsFullRpcReq) compute the set of returned doc IDs from
full_docs.docs, compare it to the expected set {app_doc_id, drawer_doc_id}, and
return an error (wrap_err/eyre) if any expected ID is missing or any unexpected
ID is present before iterating and calling big_repo.put_doc; keep existing
parsing/loading/import logic (DocumentId::from_str, Automerge::load,
big_repo.get_doc, big_repo.put_doc) but only run it when the ID set matches
exactly.
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn iroh_sync_two_nodes_can_connect() -> Res<()> { | ||
| utils_rs::testing::setup_tracing_once(); | ||
| std::env::set_var("DAYB_DISABLE_KEYRING", "1"); | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Cargo edition:"
rg -n '^\s*edition\s*=' Cargo.toml
echo
echo "Repeated DAYB_DISABLE_KEYRING env mutations:"
rg -n 'std::env::set_var\("DAYB_DISABLE_KEYRING"' srcRepository: dman-os/townframe
Length of output: 2218
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check the test file structure to see all test decorators
echo "=== Test decorators in ladder.rs ==="
rg -n '#\[tokio::test' src/daybook_core/sync/tests/ladder.rs | head -20
echo
echo "=== Context around line 221-225 ==="
sed -n '220,230p' src/daybook_core/sync/tests/ladder.rs
echo
echo "=== Check for any test synchronization or setup patterns ==="
rg -n 'static.*ONCE|MUTEX|RwLock|Barrier' src/daybook_core/sync/tests/ladder.rs | head -10
echo
echo "=== Check if setup_tracing_once provides any isolation ==="
fd setup_tracing_once src --type f -x grep -l '{}' || echo "Not found in src"Repository: dman-os/townframe
Length of output: 1110
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Find setup_tracing_once implementation ==="
find src -name "*.rs" -exec grep -l "fn setup_tracing_once" {} \;
echo
echo "=== Search for utils_rs crate ==="
fd -t f 'Cargo.toml' | xargs grep -l 'utils_rs' | head -3
echo
echo "=== Check for test concurrency settings ==="
rg -A 5 '\[package\]' Cargo.toml | head -20
echo
echo "=== Check if there are any environment variable guards in the codebase ==="
rg -n 'thread_local|lazy_static|once_cell' src --type rs | head -10Repository: dman-os/townframe
Length of output: 298
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== setup_tracing_once implementation ==="
cat src/utils_rs/testing.rs
echo
echo "=== Check Cargo.toml for test settings ==="
head -60 Cargo.tomlRepository: dman-os/townframe
Length of output: 5550
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Where is DAYB_DISABLE_KEYRING used? ==="
rg -n 'DAYB_DISABLE_KEYRING' src --type rs | head -20
echo
echo "=== Check if tests are configured to run serially ==="
rg -n 'test-threads|nextest' . --type toml
echo
echo "=== Check boot_connected_sync_pair to see if it depends on the env var ==="
rg -B 5 -A 20 'fn boot_connected_sync_pair' src/daybook_core/sync/tests/ladder.rsRepository: dman-os/townframe
Length of output: 150
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Where is DAYB_DISABLE_KEYRING used? ==="
rg 'DAYB_DISABLE_KEYRING' src
echo
echo "=== Check boot_connected_sync_pair to see if it depends on the env var ==="
rg -B 2 -A 25 'fn boot_connected_sync_pair' src/daybook_core/sync/tests/ladder.rs
echo
echo "=== Check if keyring module reads the env var ==="
rg -B 3 -A 10 'env.*DAYB_DISABLE_KEYRING|DAYB_DISABLE_KEYRING.*env' srcRepository: dman-os/townframe
Length of output: 29040
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Check for test runner configuration ==="
find . -name "*.toml" -o -name "*.yaml" -o -name "*.yml" | head -10 | xargs grep -l "test.*threads\|nextest" 2>/dev/null || echo "No test runner config found"
echo
echo "=== Look at the TEST_ENV_INIT pattern in stress.rs more closely ==="
rg -B 3 -A 8 'TEST_ENV_INIT' src/daybook_core/sync/tests/stress.rs
echo
echo "=== Check if is_keyring_disabled is called at initialization or runtime ==="
rg -B 5 -A 5 'is_keyring_disabled' src/daybook_core/secrets.rs | head -30Repository: dman-os/townframe
Length of output: 1603
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== GitHub Actions test configuration ==="
cat .github/workflows/checks.yml | grep -A 20 "test\|cargo" | head -50
echo
echo "=== Check if daybook_core has any test settings ==="
find src/daybook_core -name "Cargo.toml" -exec cat {} \;Repository: dman-os/townframe
Length of output: 6178
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Key finding: stress.rs test uses current_thread, not multi_thread ==="
rg -B 2 '#\[tokio::test' src/daybook_core/sync/tests/stress.rs
echo
echo "=== Verify: all ladder.rs tests use multi_thread ==="
rg '#\[tokio::test' src/daybook_core/sync/tests/ladder.rs | wc -l
echo
echo "=== Check if there are CLI instructions or docs about running tests ==="
find . -maxdepth 2 -name "README*" -o -name "CONTRIBUTING*" | head -5 | xargs cat 2>/dev/null | head -50
echo
echo "=== When is_keyring_disabled checked? Need to see the call chain ==="
rg -A 15 'fn should_use_keyring' src/daybook_core/secrets.rsRepository: dman-os/townframe
Length of output: 1852
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Cargo default test behavior ==="
cargo test --help 2>&1 | grep -A 5 "test-threads" | head -10
echo
echo "=== When is is_keyring_disabled actually called? At module init or runtime? ==="
rg -B 10 'should_use_keyring\(\)' src/daybook_core/secrets.rs | head -40Repository: dman-os/townframe
Length of output: 97
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== When is should_use_keyring called in the initialization chain? ==="
rg -B 5 -A 15 'should_use_keyring\(\)' src/daybook_core/secrets.rs
echo
echo "=== Trace back to find where keyring is initialized ==="
rg 'CredentialProvider\|init_credentials' src/daybook_core --type rs | head -20
echo
echo "=== Does RepoCtx initialization read the env var? ==="
rg -B 3 -A 10 'impl.*RepoCtx\|fn init' src/daybook_core/lib.rs | head -50Repository: dman-os/townframe
Length of output: 2580
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Trace RepoCtx::init to see when should_use_keyring is called ==="
rg 'RepoCtx::init' src/daybook_core/sync/tests/ladder.rs -B 2 -A 5 | head -40
echo
echo "=== Look at open_sync_node which is called in tests ==="
rg -B 5 -A 20 'fn open_sync_node' src/daybook_core/sync/tests/ladder.rs | head -50
echo
echo "=== The critical question: when the env var is read during node init, can multiple tests race? ==="
echo "Checking if nextest is the ONLY way tests run (which would prevent races):"
grep -i "nextest" .github/workflows/checks.ymlRepository: dman-os/townframe
Length of output: 1853
Extract DAYB_DISABLE_KEYRING setup into a shared test initialization.
The env var is process-global, and setting it inside #[tokio::test(flavor = "multi_thread")] can race with sibling tests when running with cargo test locally. The stress test already uses the correct pattern with std::sync::Once; apply the same guard here.
Evidence
The should_use_keyring() check in secrets.rs reads the env var during repository initialization (RepoCtx::init), which happens inside each test. With cargo nextest in CI, this is not an issue since each test runs in its own process. However, local cargo test runs tests concurrently in the same process, making the unguarded mutations racy.
The stress test (src/daybook_core/sync/tests/stress.rs) already demonstrates the correct pattern:
static TEST_ENV_INIT: Once = Once::new();
TEST_ENV_INIT.call_once(|| {
std::env::set_var("DAYB_DISABLE_KEYRING", "1");
});Apply this same approach to ladder.rs to prevent test flakiness.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/daybook_core/sync/tests/ladder.rs` around lines 221 - 225, The test sets
the process-global env var directly inside the async test (in
iroh_sync_two_nodes_can_connect), which can race with other tests; extract this
into a shared Once guard like the stress test does: add a static TEST_ENV_INIT:
std::sync::Once = std::sync::Once::new(); and replace
std::env::set_var("DAYB_DISABLE_KEYRING", "1"); with TEST_ENV_INIT.call_once(||
{ std::env::set_var("DAYB_DISABLE_KEYRING", "1"); }); so the env var is only set
once safely across concurrent tests.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Documentation
Tests