From 90f9cad1c2022961307be6e3e5f5ccd119e6e785 Mon Sep 17 00:00:00 2001 From: GREENRAT-K405 Date: Sun, 1 Mar 2026 22:51:39 +0530 Subject: [PATCH 1/5] fix redundant zmq.Context() Instances --- concore_base.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/concore_base.py b/concore_base.py index f6b3e1b..728d807 100644 --- a/concore_base.py +++ b/concore_base.py @@ -11,15 +11,18 @@ # =================================================================== # ZeroMQ Communication Wrapper # =================================================================== +# single shared ZMQ context for the entire process. +_zmq_context = zmq.Context() + class ZeroMQPort: - def __init__(self, port_type, address, zmq_socket_type): + def __init__(self, port_type, address, zmq_socket_type, context): """ port_type: "bind" or "connect" address: ZeroMQ address (e.g., "tcp://*:5555") zmq_socket_type: zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB etc. + context: shared zmq.Context() for the process (do not create one per port) """ - self.context = zmq.Context() - self.socket = self.context.socket(zmq_socket_type) + self.socket = context.socket(zmq_socket_type) self.port_type = port_type # "bind" or "connect" self.address = address @@ -76,7 +79,7 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str): try: # Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP) zmq_socket_type = getattr(zmq, socket_type_str.upper()) - mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type) + mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type, _zmq_context) logger.info(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}") except AttributeError: logger.error(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.") @@ -86,23 +89,31 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str): logger.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}") def terminate_zmq(mod): - """Clean up all ZMQ sockets and contexts before exit.""" + """Clean up all ZMQ sockets, then terminate the shared context once.""" if mod._cleanup_in_progress: return # Already cleaning up, prevent reentrant calls - + if not mod.zmq_ports: return # No ports to clean up - + mod._cleanup_in_progress = True print("\nCleaning up ZMQ resources...") + + # all sockets must be closed before context.term() is called. for port_name, port in mod.zmq_ports.items(): try: port.socket.close() - port.context.term() print(f"Closed ZMQ port: {port_name}") except Exception as e: logger.error(f"Error while terminating ZMQ port {port.address}: {e}") mod.zmq_ports.clear() + + # terminate the single shared context exactly once. + try: + _zmq_context.term() + except Exception as e: + logger.error(f"Error while terminating shared ZMQ context: {e}") + mod._cleanup_in_progress = False # --- ZeroMQ Integration End --- From 7736a97d5f4041c37eae0b8c80ca53fecc7dee68 Mon Sep 17 00:00:00 2001 From: GREENRAT-K405 Date: Sun, 1 Mar 2026 23:07:06 +0530 Subject: [PATCH 2/5] fix redundant zmq.Context() Instances --- concore_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concore_base.py b/concore_base.py index 728d807..fddb1f6 100644 --- a/concore_base.py +++ b/concore_base.py @@ -20,7 +20,7 @@ def __init__(self, port_type, address, zmq_socket_type, context): port_type: "bind" or "connect" address: ZeroMQ address (e.g., "tcp://*:5555") zmq_socket_type: zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB etc. - context: shared zmq.Context() for the process (do not create one per port) + context: shared zmq.Context() for the process """ self.socket = context.socket(zmq_socket_type) self.port_type = port_type # "bind" or "connect" From 0c4ec88003243759c1c85eeb84e36e4bb09652d4 Mon Sep 17 00:00:00 2001 From: GREENRAT-K405 Date: Sun, 1 Mar 2026 23:22:05 +0530 Subject: [PATCH 3/5] azy-init shared context instead of one per port --- concore_base.py | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/concore_base.py b/concore_base.py index fddb1f6..b7feefd 100644 --- a/concore_base.py +++ b/concore_base.py @@ -11,17 +11,28 @@ # =================================================================== # ZeroMQ Communication Wrapper # =================================================================== -# single shared ZMQ context for the entire process. -_zmq_context = zmq.Context() +# lazy-initialized shared ZMQ context for the entire process. +# using None until first ZMQ port is created, so file-only workflows +# never spawn ZMQ I/O threads at import time. +_zmq_context = None + +def _get_zmq_context(): + """Return the process-level shared ZMQ context, creating it on first call.""" + global _zmq_context + if _zmq_context is None or _zmq_context.closed: + _zmq_context = zmq.Context() + return _zmq_context class ZeroMQPort: - def __init__(self, port_type, address, zmq_socket_type, context): + def __init__(self, port_type, address, zmq_socket_type, context=None): """ port_type: "bind" or "connect" address: ZeroMQ address (e.g., "tcp://*:5555") zmq_socket_type: zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB etc. - context: shared zmq.Context() for the process + context: optional zmq.Context() for the process; defaults to the shared _zmq_context. """ + if context is None: + context = _get_zmq_context() self.socket = context.socket(zmq_socket_type) self.port_type = port_type # "bind" or "connect" self.address = address @@ -79,7 +90,7 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str): try: # Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP) zmq_socket_type = getattr(zmq, socket_type_str.upper()) - mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type, _zmq_context) + mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type, _get_zmq_context()) logger.info(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}") except AttributeError: logger.error(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.") @@ -108,11 +119,15 @@ def terminate_zmq(mod): logger.error(f"Error while terminating ZMQ port {port.address}: {e}") mod.zmq_ports.clear() - # terminate the single shared context exactly once. - try: - _zmq_context.term() - except Exception as e: - logger.error(f"Error while terminating shared ZMQ context: {e}") + # terminate the single shared context exactly once, then reset so it + # can be safely recreated if init_zmq_port is called again later. + global _zmq_context + if _zmq_context is not None and not _zmq_context.closed: + try: + _zmq_context.term() + except Exception as e: + logger.error(f"Error while terminating shared ZMQ context: {e}") + _zmq_context = None mod._cleanup_in_progress = False From fb27fa9c80a6a4223ff9a0e56e417009f4690d75 Mon Sep 17 00:00:00 2001 From: PARAM KANADA Date: Sun, 1 Mar 2026 23:30:50 +0530 Subject: [PATCH 4/5] Update concore_base.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- concore_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/concore_base.py b/concore_base.py index b7feefd..e8f6197 100644 --- a/concore_base.py +++ b/concore_base.py @@ -104,8 +104,8 @@ def terminate_zmq(mod): if mod._cleanup_in_progress: return # Already cleaning up, prevent reentrant calls - if not mod.zmq_ports: - return # No ports to clean up + if not mod.zmq_ports and (_zmq_context is None or _zmq_context.closed): + return # Nothing to clean up: no ports and no active context mod._cleanup_in_progress = True print("\nCleaning up ZMQ resources...") From 0081e834abadfc6378f8b2ab198f98a4344d32e2 Mon Sep 17 00:00:00 2001 From: GREENRAT-K405 Date: Sun, 1 Mar 2026 23:37:54 +0530 Subject: [PATCH 5/5] fix: move global _zmq_context declaration before first use in terminate_zmq --- concore_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concore_base.py b/concore_base.py index e8f6197..24547b0 100644 --- a/concore_base.py +++ b/concore_base.py @@ -101,6 +101,7 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str): def terminate_zmq(mod): """Clean up all ZMQ sockets, then terminate the shared context once.""" + global _zmq_context # declared first — used both in the early-return guard and reset below if mod._cleanup_in_progress: return # Already cleaning up, prevent reentrant calls @@ -121,7 +122,6 @@ def terminate_zmq(mod): # terminate the single shared context exactly once, then reset so it # can be safely recreated if init_zmq_port is called again later. - global _zmq_context if _zmq_context is not None and not _zmq_context.closed: try: _zmq_context.term()