Skip to content

Fix: Input/OutputTopic/Relay for Collections#228

Open
griffinmilsap wants to merge 2 commits intodevfrom
fix/collection-topics-relays
Open

Fix: Input/OutputTopic/Relay for Collections#228
griffinmilsap wants to merge 2 commits intodevfrom
fix/collection-topics-relays

Conversation

@griffinmilsap
Copy link
Collaborator

@griffinmilsap griffinmilsap commented Mar 2, 2026

Closes #11

This PR introduces new high-level graph endpoint types for Collections:

  • Topic
  • InputTopic
  • OutputTopic
  • InputRelay
  • OutputRelay

The goal is to separate:

  • zero-overhead graph topic shortcuts (Topic / InputTopic / OutputTopic)
  • explicit boundary republishers with pub/sub behavior (InputRelay / OutputRelay)

This preserves current behavior for existing systems while giving users a clear, explicit way to opt into boundary subscriber/publisher semantics.

Motivation

Historically, Collection boundary endpoints were often declared as InputStream / OutputStream, even though in many cases they functioned only as graph topic shortcut nodes (not true pub/sub clients).

This conflates two concepts:

  • graph-level topic wiring
  • runtime subscriber/publisher configuration

The new API makes that distinction explicit and backward-compatible.

What Changed

1) New endpoint classes

Added in src/ezmsg/core/stream.py:

  • Topic(Stream): non-directional graph endpoint metadata.
  • InputTopic(Topic): directional alias for Collection inputs.
  • OutputTopic(Topic): directional alias for Collection outputs.
  • InputRelay(InputTopic): Collection input boundary with subscriber-side relay options:
    • leaky
    • max_queue
    • copy_on_forward (default True)
  • OutputRelay(OutputTopic): Collection output boundary with publisher-side relay options:
    • host
    • port
    • num_buffers
    • buf_size
    • force_tcp
    • copy_on_forward (default True)

2) Internal relay unit implementation

Added src/ezmsg/core/relay.py:

  • _CollectionRelayUnit (internal)
  • _RelaySettings (internal)

Behavior:

  • materialized relays are explicit subscriber+publisher hops
  • relay forwarding defaults to deepcopy(msg) (copy_on_forward=True) for safety under zero-copy semantics
  • advanced users can disable copying (copy_on_forward=False) when appropriate

3) Unit restrictions (hard error)

Updated src/ezmsg/core/unit.py:

  • Units now raise TypeError at class creation if they declare Topic/InputTopic/OutputTopic/InputRelay/OutputRelay.
  • Units must continue using InputStream / OutputStream.

4) Collection legacy stream warning

Updated src/ezmsg/core/collection.py:

  • CollectionMeta emits FutureWarning when a Collection declares boundary InputStream/OutputStream.
  • Legacy behavior remains functional (no runtime break).

5) Relay graph materialization/rewrite in setup

Updated src/ezmsg/core/backend.py (ExecutionContext.setup):

  • detects InputRelay / OutputRelay endpoints in Collections
  • injects hidden internal relay units into the containing Collection
  • rewrites graph edges to route through relay units while preserving public Collection boundary topic names
  • synchronizes relay settings after Collection.configure() so configure-time edits to relay options are honored
  • normalized endpoint conversion now validates endpoint types (Stream, str, Enum)

6) Public exports

Updated src/ezmsg/core/__init__.py:

  • exported: Topic, InputTopic, OutputTopic, InputRelay, OutputRelay

Backward Compatibility

  • Existing Collection InputStream/OutputStream usage still works.
  • Existing systems continue to run without behavior change in that legacy path.
  • Users receive FutureWarning encouraging migration:
    • use InputTopic/OutputTopic for zero-overhead topic shortcuts
    • use InputRelay/OutputRelay for explicit boundary republishers

Zero-Copy Safety

Given high-level @ez.subscriber zero-copy semantics, relays default to safe forwarding:

  • copy_on_forward=True (default): deep-copies inbound message before republish
  • copy_on_forward=False: republish same object reference (advanced/unsafe mode)
    • Note that currently, leaky subscribers already deepcopy (which could change in the future), so it may be completely rational to provide leaky=True, copy_on_forward=False.

This is intended to prevent aliasing/corruption hazards when relaying cached/shared-memory-backed messages.

Tests Added

New file: tests/test_topics.py

Covers:

  • Unit rejects all topic/relay endpoint types (TypeError)
  • Collection legacy stream endpoint emits FutureWarning
  • InputTopic/OutputTopic preserve topic-shortcut behavior
  • InputRelay rewrites edges correctly and syncs configure-time settings
  • OutputRelay rewrites edges correctly and syncs configure-time settings

griffinmilsap added a commit that referenced this pull request Mar 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enhancement (Maybe bugfix?): Republisher support for OutputStreams in Collections

1 participant