From d8f3e6cf52572e0c8fbd9ee5c1f0f95cb00fb057 Mon Sep 17 00:00:00 2001 From: Avinash Kumar Deepak Date: Mon, 2 Mar 2026 01:01:47 +0530 Subject: [PATCH] feat: add ZMQ transport to concore.hpp/concore_base.hpp (fixes #474) --- concore.hpp | 120 +++++++++++++++++++++++++++++++++++++++++++++++ concore_base.hpp | 111 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 231 insertions(+) diff --git a/concore.hpp b/concore.hpp index da2c792..134bf22 100644 --- a/concore.hpp +++ b/concore.hpp @@ -49,6 +49,10 @@ class Concore{ int communication_iport = 0; // iport refers to input port int communication_oport = 0; // oport refers to input port +#ifdef CONCORE_USE_ZMQ + map zmq_ports; +#endif + public: double delay = 1; int retrycount = 0; @@ -107,6 +111,11 @@ class Concore{ */ ~Concore() { +#ifdef CONCORE_USE_ZMQ + for (auto& kv : zmq_ports) + delete kv.second; + zmq_ports.clear(); +#endif #ifdef __linux__ // Detach the shared memory segment from the process if (communication_oport == 1 && sharedData_create != nullptr) { @@ -549,6 +558,117 @@ class Concore{ } } +#ifdef CONCORE_USE_ZMQ + /** + * @brief Registers a ZMQ port for use with read()/write(). + * @param port_name The ZMQ port name. + * @param port_type "bind" or "connect". + * @param address The ZMQ address. + * @param socket_type_str The socket type string. + */ + void init_zmq_port(string port_name, string port_type, string address, string socket_type_str) { + if (zmq_ports.count(port_name)) return; + int sock_type = concore_base::zmq_socket_type_from_string(socket_type_str); + if (sock_type == -1) { + cerr << "init_zmq_port: unknown socket type '" << socket_type_str << "'" << endl; + return; + } + zmq_ports[port_name] = new concore_base::ZeroMQPort(port_type, address, sock_type); + } + + /** + * @brief Reads data from a ZMQ port. Strips simtime prefix, updates simtime. + * @param port_name The ZMQ port name. + * @param name The name of the file. + * @param initstr The initial string. + * @return a vector of double values + */ + vector read_ZMQ(string port_name, string name, string initstr) { + auto it = zmq_ports.find(port_name); + if (it == zmq_ports.end()) { + cerr << "read_ZMQ: port '" << port_name << "' not initialized" << endl; + return parser(initstr); + } + vector inval = it->second->recv_with_retry(); + if (inval.empty()) + inval = parser(initstr); + if (inval.empty()) return inval; + simtime = simtime > inval[0] ? simtime : inval[0]; + s += port_name; + inval.erase(inval.begin()); + return inval; + } + + /** + * @brief Writes a vector of double values to a ZMQ port. Prepends simtime+delta. + * @param port_name The ZMQ port name. + * @param name The name of the file. + * @param val The vector of double values to write. + * @param delta The delta value (default: 0). + */ + void write_ZMQ(string port_name, string name, vector val, int delta=0) { + auto it = zmq_ports.find(port_name); + if (it == zmq_ports.end()) { + cerr << "write_ZMQ: port '" << port_name << "' not initialized" << endl; + return; + } + val.insert(val.begin(), simtime + delta); + it->second->send_with_retry(val); + // simtime must not be mutated here (issue #385). + } + + /** + * @brief Writes a string to a ZMQ port. + * @param port_name The ZMQ port name. + * @param name The name of the file. + * @param val The string to write. + * @param delta The delta value (default: 0). + */ + void write_ZMQ(string port_name, string name, string val, int delta=0) { + auto it = zmq_ports.find(port_name); + if (it == zmq_ports.end()) { + cerr << "write_ZMQ: port '" << port_name << "' not initialized" << endl; + return; + } + chrono::milliseconds timespan((int)(2000*delay)); + this_thread::sleep_for(timespan); + it->second->send_string_with_retry(val); + } + + /** + * @brief deviate the read to ZMQ communication protocol when port identifier is a string key. + * @param port_name The ZMQ port name. + * @param name The name of the file. + * @param initstr The initial string. + * @return + */ + vector read(string port_name, string name, string initstr) { + return read_ZMQ(port_name, name, initstr); + } + + /** + * @brief deviate the write to ZMQ communication protocol when port identifier is a string key. + * @param port_name The ZMQ port name. + * @param name The name of the file. + * @param val The vector of double values to write. + * @param delta The delta value (default: 0). + */ + void write(string port_name, string name, vector val, int delta=0) { + return write_ZMQ(port_name, name, val, delta); + } + + /** + * @brief deviate the write to ZMQ communication protocol when port identifier is a string key. + * @param port_name The ZMQ port name. + * @param name The name of the file. + * @param val The string to write. + * @param delta The delta value (default: 0). + */ + void write(string port_name, string name, string val, int delta=0) { + return write_ZMQ(port_name, name, val, delta); + } +#endif // CONCORE_USE_ZMQ + /** * @brief Strips leading and trailing whitespace from a string. * @param str The input string. diff --git a/concore_base.hpp b/concore_base.hpp index 6479942..9018f61 100644 --- a/concore_base.hpp +++ b/concore_base.hpp @@ -178,6 +178,117 @@ inline std::string tryparam( return (it != params.end()) ? it->second : defaultValue; } + +// =================================================================== +// ZeroMQ Transport (opt-in: compile with -DCONCORE_USE_ZMQ) +// =================================================================== +#ifdef CONCORE_USE_ZMQ +#include + +/** + * ZMQ socket wrapper with bind/connect, timeouts, and retry. + */ +class ZeroMQPort { +public: + zmq::context_t context; + zmq::socket_t socket; + std::string port_type; + std::string address; + + ZeroMQPort(const std::string& port_type_, const std::string& address_, int socket_type) + : context(1), socket(context, socket_type), + port_type(port_type_), address(address_) + { + socket.setsockopt(ZMQ_RCVTIMEO, 2000); + socket.setsockopt(ZMQ_SNDTIMEO, 2000); + socket.setsockopt(ZMQ_LINGER, 0); + + if (port_type == "bind") + socket.bind(address); + else + socket.connect(address); + } + + ZeroMQPort(const ZeroMQPort&) = delete; + ZeroMQPort& operator=(const ZeroMQPort&) = delete; + + /** + * Sends a vector as "[v0, v1, ...]" with retry on timeout. + */ + void send_with_retry(const std::vector& payload) { + std::ostringstream ss; + ss << "["; + for (size_t i = 0; i < payload.size(); ++i) { + if (i) ss << ", "; + ss << payload[i]; + } + ss << "]"; + std::string msg = ss.str(); + for (int attempt = 0; attempt < 5; ++attempt) { + try { + zmq::message_t zmsg(msg.begin(), msg.end()); + socket.send(zmsg, zmq::send_flags::none); + return; + } catch (const zmq::error_t&) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + } + std::cerr << "ZMQ send failed after retries." << std::endl; + } + + /** + * Sends a raw string with retry on timeout. + */ + void send_string_with_retry(const std::string& msg) { + for (int attempt = 0; attempt < 5; ++attempt) { + try { + zmq::message_t zmsg(msg.begin(), msg.end()); + socket.send(zmsg, zmq::send_flags::none); + return; + } catch (const zmq::error_t&) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + } + std::cerr << "ZMQ send failed after retries." << std::endl; + } + + /** + * Receives and parses "[v0, v1, ...]" back to vector. + */ + std::vector recv_with_retry() { + for (int attempt = 0; attempt < 5; ++attempt) { + try { + zmq::message_t zmsg; + auto res = socket.recv(zmsg, zmq::recv_flags::none); + if (res) { + std::string data(static_cast(zmsg.data()), zmsg.size()); + return parselist_double(data); + } + } catch (const zmq::error_t&) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + } + std::cerr << "ZMQ recv failed after retries." << std::endl; + return {}; + } +}; + +/** + * Maps socket type string ("REQ", "REP", etc.) to ZMQ constant. + * Returns -1 on unknown type. + */ +inline int zmq_socket_type_from_string(const std::string& s) { + if (s == "REQ") return ZMQ_REQ; + if (s == "REP") return ZMQ_REP; + if (s == "PUB") return ZMQ_PUB; + if (s == "SUB") return ZMQ_SUB; + if (s == "PUSH") return ZMQ_PUSH; + if (s == "PULL") return ZMQ_PULL; + if (s == "PAIR") return ZMQ_PAIR; + return -1; +} +#endif // CONCORE_USE_ZMQ + } // namespace concore_base #endif // CONCORE_BASE_HPP