diff --git a/Dockerfile.java b/Dockerfile.java index a3eb3ad..b78f1e6 100644 --- a/Dockerfile.java +++ b/Dockerfile.java @@ -1,17 +1,14 @@ #build stage -FROM maven:3.9-eclipse-temurin-17-alpine AS builder +FROM eclipse-temurin:17-jdk-alpine AS builder WORKDIR /build -COPY pom.xml . -RUN mvn dependency:go-offline -B -COPY src ./src -RUN mvn clean package -DskipTests +RUN apk add --no-cache wget +RUN wget -q "https://search.maven.org/remotecontent?filepath=org/zeromq/jeromq/0.6.0/jeromq-0.6.0.jar" -O /opt/jeromq.jar +COPY *.java . +RUN javac -cp /opt/jeromq.jar *.java #runtime stage -FROM eclipse-temurin:17-jdk-alpine +FROM eclipse-temurin:17-jre-alpine WORKDIR /app - -# Copy the JAR from the build stage -COPY --from=builder /build/target/concore-*.jar /app/concore.jar -EXPOSE 3000 -CMD ["java", "-jar", "/app/concore.jar"] \ No newline at end of file +COPY --from=builder /build/*.class /app/ +COPY --from=builder /opt/jeromq.jar /app/jeromq.jar \ No newline at end of file diff --git a/concoredocker.java b/concoredocker.java index 96cf6ee..226865f 100644 --- a/concoredocker.java +++ b/concoredocker.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.ArrayList; import java.util.List; +import org.zeromq.ZMQ; /** * Java implementation of concore Docker communication. @@ -24,6 +25,8 @@ public class concoredocker { private static String inpath = "/in"; private static String outpath = "/out"; private static Map params = new HashMap<>(); + private static Map zmqPorts = new HashMap<>(); + private static ZMQ.Context zmqContext = null; // simtime as double to preserve fractional values (e.g. "[0.0, ...]") private static double simtime = 0; private static double maxtime; @@ -49,6 +52,7 @@ public class concoredocker { params = new HashMap<>(); } defaultMaxTime(100); + Runtime.getRuntime().addShutdownHook(new Thread(concoredocker::terminateZmq)); } /** @@ -292,7 +296,7 @@ public static void write(int port, String name, Object val, int delta) { } content.append("]"); // simtime must not be mutated here. - // Mutation breaks cross-language determinism (see issue #385). + // Mutation breaks cross-language determinism. } else if (val instanceof Object[]) { // Legacy support for Object[] arguments Object[] arrayVal = (Object[]) val; @@ -304,7 +308,7 @@ public static void write(int port, String name, Object val, int delta) { } content.append("]"); // simtime must not be mutated here. - // Mutation breaks cross-language determinism (see issue #385). + // Mutation breaks cross-language determinism. } else { System.out.println("write must have list or str"); return; @@ -336,6 +340,113 @@ public static List initVal(String simtimeVal) { return val; } + private static ZMQ.Context getZmqContext() { + if (zmqContext == null) { + zmqContext = ZMQ.context(1); + } + return zmqContext; + } + + public static void initZmqPort(String portName, String portType, String address, String socketTypeStr) { + if (zmqPorts.containsKey(portName)) return; + int sockType = zmqSocketTypeFromString(socketTypeStr); + if (sockType == -1) { + System.err.println("initZmqPort: unknown socket type '" + socketTypeStr + "'"); + return; + } + zmqPorts.put(portName, new ZeroMQPort(portType, address, sockType)); + } + + public static void terminateZmq() { + for (Map.Entry entry : zmqPorts.entrySet()) { + entry.getValue().socket.close(); + } + zmqPorts.clear(); + if (zmqContext != null) { + zmqContext.term(); + zmqContext = null; + } + } + + private static int zmqSocketTypeFromString(String s) { + switch (s.toUpperCase()) { + case "REQ": return ZMQ.REQ; + case "REP": return ZMQ.REP; + case "PUB": return ZMQ.PUB; + case "SUB": return ZMQ.SUB; + case "PUSH": return ZMQ.PUSH; + case "PULL": return ZMQ.PULL; + case "PAIR": return ZMQ.PAIR; + default: return -1; + } + } + + /** + * Reads data from a ZMQ port. Same wire format as file-based read: + * expects [simtime, val1, val2, ...], strips simtime, returns the rest. + */ + public static List read(String portName, String name, String initstr) { + List defaultVal = new ArrayList<>(); + try { + List parsed = (List) literalEval(initstr); + if (parsed.size() > 1) { + defaultVal = new ArrayList<>(parsed.subList(1, parsed.size())); + } + } catch (Exception e) { + } + ZeroMQPort port = zmqPorts.get(portName); + if (port == null) { + System.err.println("read: ZMQ port '" + portName + "' not initialized"); + return defaultVal; + } + String msg = port.recvWithRetry(); + if (msg == null) { + System.err.println("read: ZMQ recv timeout on port '" + portName + "'"); + return defaultVal; + } + s += msg; + try { + List inval = (List) literalEval(msg); + if (!inval.isEmpty()) { + simtime = Math.max(simtime, ((Number) inval.get(0)).doubleValue()); + return new ArrayList<>(inval.subList(1, inval.size())); + } + } catch (Exception e) { + System.out.println("Error parsing ZMQ message '" + msg + "': " + e.getMessage()); + } + return defaultVal; + } + + /** + * Writes data to a ZMQ port. Prepends [simtime+delta] to match file-based write behavior. + */ + public static void write(String portName, String name, Object val, int delta) { + ZeroMQPort port = zmqPorts.get(portName); + if (port == null) { + System.err.println("write: ZMQ port '" + portName + "' not initialized"); + return; + } + String payload; + if (val instanceof List) { + List listVal = (List) val; + StringBuilder sb = new StringBuilder("["); + sb.append(toPythonLiteral(simtime + delta)); + for (Object o : listVal) { + sb.append(", "); + sb.append(toPythonLiteral(o)); + } + sb.append("]"); + payload = sb.toString(); + // simtime must not be mutated here + } else if (val instanceof String) { + payload = (String) val; + } else { + System.out.println("write must have list or str"); + return; + } + port.sendWithRetry(payload); + } + /** * Parses a Python-literal string into Java objects using a recursive descent parser. * Supports: dict, list, int, float, string (single/double quoted), bool, None, nested structures. @@ -354,6 +465,44 @@ static Object literalEval(String s) { return result; } + /** + * ZMQ socket wrapper with bind/connect, timeouts, and retry. + */ + private static class ZeroMQPort { + final ZMQ.Socket socket; + final String address; + + ZeroMQPort(String portType, String address, int socketType) { + this.address = address; + ZMQ.Context ctx = getZmqContext(); + this.socket = ctx.socket(socketType); + this.socket.setReceiveTimeOut(2000); + this.socket.setSendTimeOut(2000); + this.socket.setLinger(0); + if (portType.equals("bind")) { + this.socket.bind(address); + } else { + this.socket.connect(address); + } + } + + String recvWithRetry() { + for (int attempt = 0; attempt < 5; attempt++) { + String msg = socket.recvStr(); + if (msg != null) return msg; + try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } + } + return null; + } + + void sendWithRetry(String message) { + for (int attempt = 0; attempt < 5; attempt++) { + if (socket.send(message)) return; + try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } + } + } + } + /** * Recursive descent parser for Python literal expressions. * Handles: dicts, lists, tuples, strings, numbers, booleans, None.