Fix: Input/OutputTopic/Relay for Collections#228
Open
griffinmilsap wants to merge 2 commits intodevfrom
Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #11
This PR introduces new high-level graph endpoint types for Collections:
TopicInputTopicOutputTopicInputRelayOutputRelayThe goal is to separate:
Topic/InputTopic/OutputTopic)InputRelay/OutputRelay)This preserves current behavior for existing systems while giving users a clear, explicit way to opt into boundary subscriber/publisher semantics.
Motivation
Historically, Collection boundary endpoints were often declared as
InputStream/OutputStream, even though in many cases they functioned only as graph topic shortcut nodes (not true pub/sub clients).This conflates two concepts:
The new API makes that distinction explicit and backward-compatible.
What Changed
1) New endpoint classes
Added in
src/ezmsg/core/stream.py:Topic(Stream): non-directional graph endpoint metadata.InputTopic(Topic): directional alias for Collection inputs.OutputTopic(Topic): directional alias for Collection outputs.InputRelay(InputTopic): Collection input boundary with subscriber-side relay options:leakymax_queuecopy_on_forward(defaultTrue)OutputRelay(OutputTopic): Collection output boundary with publisher-side relay options:hostportnum_buffersbuf_sizeforce_tcpcopy_on_forward(defaultTrue)2) Internal relay unit implementation
Added
src/ezmsg/core/relay.py:_CollectionRelayUnit(internal)_RelaySettings(internal)Behavior:
deepcopy(msg)(copy_on_forward=True) for safety under zero-copy semanticscopy_on_forward=False) when appropriate3) Unit restrictions (hard error)
Updated
src/ezmsg/core/unit.py:TypeErrorat class creation if they declareTopic/InputTopic/OutputTopic/InputRelay/OutputRelay.InputStream/OutputStream.4) Collection legacy stream warning
Updated
src/ezmsg/core/collection.py:CollectionMetaemitsFutureWarningwhen a Collection declares boundaryInputStream/OutputStream.5) Relay graph materialization/rewrite in setup
Updated
src/ezmsg/core/backend.py(ExecutionContext.setup):InputRelay/OutputRelayendpoints in CollectionsCollection.configure()so configure-time edits to relay options are honoredStream,str,Enum)6) Public exports
Updated
src/ezmsg/core/__init__.py:Topic,InputTopic,OutputTopic,InputRelay,OutputRelayBackward Compatibility
InputStream/OutputStreamusage still works.FutureWarningencouraging migration:InputTopic/OutputTopicfor zero-overhead topic shortcutsInputRelay/OutputRelayfor explicit boundary republishersZero-Copy Safety
Given high-level
@ez.subscriberzero-copy semantics, relays default to safe forwarding:copy_on_forward=True(default): deep-copies inbound message before republishcopy_on_forward=False: republish same object reference (advanced/unsafe mode)leakysubscribers already deepcopy (which could change in the future), so it may be completely rational to provideleaky=True, copy_on_forward=False.This is intended to prevent aliasing/corruption hazards when relaying cached/shared-memory-backed messages.
Tests Added
New file:
tests/test_topics.pyCovers:
TypeError)FutureWarningInputTopic/OutputTopicpreserve topic-shortcut behaviorInputRelayrewrites edges correctly and syncs configure-time settingsOutputRelayrewrites edges correctly and syncs configure-time settings