Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions concore_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,29 @@
# ===================================================================
# ZeroMQ Communication Wrapper
# ===================================================================
# lazy-initialized shared ZMQ context for the entire process.
# using None until first ZMQ port is created, so file-only workflows
# never spawn ZMQ I/O threads at import time.
_zmq_context = None

def _get_zmq_context():
"""Return the process-level shared ZMQ context, creating it on first call."""
global _zmq_context
if _zmq_context is None or _zmq_context.closed:
_zmq_context = zmq.Context()
return _zmq_context
Comment on lines +19 to +24
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new shared-context lifecycle (_get_zmq_context() lazy init + terminate_zmq() single term) isn’t covered by tests. Adding a unit test that asserts only one zmq.Context is created across multiple init_zmq_port calls, and that terminate_zmq resets/terminates the context (including the failure path where port creation raises and no port is registered) would help prevent regressions.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will write tests once this PR is discussed with mentors!


class ZeroMQPort:
def __init__(self, port_type, address, zmq_socket_type):
def __init__(self, port_type, address, zmq_socket_type, context=None):
"""
port_type: "bind" or "connect"
address: ZeroMQ address (e.g., "tcp://*:5555")
zmq_socket_type: zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB etc.
context: optional zmq.Context() for the process; defaults to the shared _zmq_context.
"""
self.context = zmq.Context()
self.socket = self.context.socket(zmq_socket_type)
if context is None:
context = _get_zmq_context()
self.socket = context.socket(zmq_socket_type)
self.port_type = port_type # "bind" or "connect"
self.address = address

Expand Down Expand Up @@ -76,7 +90,7 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str):
try:
# Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP)
zmq_socket_type = getattr(zmq, socket_type_str.upper())
mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type)
mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type, _get_zmq_context())
logger.info(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}")
except AttributeError:
logger.error(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.")
Expand All @@ -86,23 +100,35 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str):
logger.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")

def terminate_zmq(mod):
"""Clean up all ZMQ sockets and contexts before exit."""
"""Clean up all ZMQ sockets, then terminate the shared context once."""
global _zmq_context # declared first — used both in the early-return guard and reset below
if mod._cleanup_in_progress:
return # Already cleaning up, prevent reentrant calls
if not mod.zmq_ports:
return # No ports to clean up

if not mod.zmq_ports and (_zmq_context is None or _zmq_context.closed):
return # Nothing to clean up: no ports and no active context

mod._cleanup_in_progress = True
print("\nCleaning up ZMQ resources...")

# all sockets must be closed before context.term() is called.
for port_name, port in mod.zmq_ports.items():
try:
port.socket.close()
port.context.term()
print(f"Closed ZMQ port: {port_name}")
except Exception as e:
logger.error(f"Error while terminating ZMQ port {port.address}: {e}")
mod.zmq_ports.clear()

# terminate the single shared context exactly once, then reset so it
# can be safely recreated if init_zmq_port is called again later.
if _zmq_context is not None and not _zmq_context.closed:
try:
_zmq_context.term()
except Exception as e:
logger.error(f"Error while terminating shared ZMQ context: {e}")
_zmq_context = None

mod._cleanup_in_progress = False

# --- ZeroMQ Integration End ---
Expand Down