diff --git a/concore_base.py b/concore_base.py index f6b3e1b..24547b0 100644 --- a/concore_base.py +++ b/concore_base.py @@ -11,15 +11,29 @@ # =================================================================== # ZeroMQ Communication Wrapper # =================================================================== +# lazy-initialized shared ZMQ context for the entire process. +# using None until first ZMQ port is created, so file-only workflows +# never spawn ZMQ I/O threads at import time. +_zmq_context = None + +def _get_zmq_context(): + """Return the process-level shared ZMQ context, creating it on first call.""" + global _zmq_context + if _zmq_context is None or _zmq_context.closed: + _zmq_context = zmq.Context() + return _zmq_context + class ZeroMQPort: - def __init__(self, port_type, address, zmq_socket_type): + def __init__(self, port_type, address, zmq_socket_type, context=None): """ port_type: "bind" or "connect" address: ZeroMQ address (e.g., "tcp://*:5555") zmq_socket_type: zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB etc. + context: optional zmq.Context() for the process; defaults to the shared _zmq_context. """ - self.context = zmq.Context() - self.socket = self.context.socket(zmq_socket_type) + if context is None: + context = _get_zmq_context() + self.socket = context.socket(zmq_socket_type) self.port_type = port_type # "bind" or "connect" self.address = address @@ -76,7 +90,7 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str): try: # Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP) zmq_socket_type = getattr(zmq, socket_type_str.upper()) - mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type) + mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type, _get_zmq_context()) logger.info(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}") except AttributeError: logger.error(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.") @@ -86,23 +100,35 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str): logger.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}") def terminate_zmq(mod): - """Clean up all ZMQ sockets and contexts before exit.""" + """Clean up all ZMQ sockets, then terminate the shared context once.""" + global _zmq_context # declared first — used both in the early-return guard and reset below if mod._cleanup_in_progress: return # Already cleaning up, prevent reentrant calls - - if not mod.zmq_ports: - return # No ports to clean up - + + if not mod.zmq_ports and (_zmq_context is None or _zmq_context.closed): + return # Nothing to clean up: no ports and no active context + mod._cleanup_in_progress = True print("\nCleaning up ZMQ resources...") + + # all sockets must be closed before context.term() is called. for port_name, port in mod.zmq_ports.items(): try: port.socket.close() - port.context.term() print(f"Closed ZMQ port: {port_name}") except Exception as e: logger.error(f"Error while terminating ZMQ port {port.address}: {e}") mod.zmq_ports.clear() + + # terminate the single shared context exactly once, then reset so it + # can be safely recreated if init_zmq_port is called again later. + if _zmq_context is not None and not _zmq_context.closed: + try: + _zmq_context.term() + except Exception as e: + logger.error(f"Error while terminating shared ZMQ context: {e}") + _zmq_context = None + mod._cleanup_in_progress = False # --- ZeroMQ Integration End ---