Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions Dockerfile.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
#build stage
FROM maven:3.9-eclipse-temurin-17-alpine AS builder
FROM eclipse-temurin:17-jdk-alpine AS builder
WORKDIR /build
COPY pom.xml .
RUN mvn dependency:go-offline -B
COPY src ./src
RUN mvn clean package -DskipTests
RUN apk add --no-cache wget
RUN wget -q "https://search.maven.org/remotecontent?filepath=org/zeromq/jeromq/0.6.0/jeromq-0.6.0.jar" -O /opt/jeromq.jar
COPY *.java .
RUN javac -cp /opt/jeromq.jar *.java

#runtime stage
FROM eclipse-temurin:17-jdk-alpine
FROM eclipse-temurin:17-jre-alpine

WORKDIR /app

# Copy the JAR from the build stage
COPY --from=builder /build/target/concore-*.jar /app/concore.jar
EXPOSE 3000
CMD ["java", "-jar", "/app/concore.jar"]
COPY --from=builder /build/*.class /app/
COPY --from=builder /opt/jeromq.jar /app/jeromq.jar
153 changes: 151 additions & 2 deletions concoredocker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.ArrayList;
import java.util.List;
import org.zeromq.ZMQ;

/**
* Java implementation of concore Docker communication.
Expand All @@ -24,6 +25,8 @@ public class concoredocker {
private static String inpath = "/in";
private static String outpath = "/out";
private static Map<String, Object> params = new HashMap<>();
private static Map<String, ZeroMQPort> zmqPorts = new HashMap<>();
private static ZMQ.Context zmqContext = null;
// simtime as double to preserve fractional values (e.g. "[0.0, ...]")
private static double simtime = 0;
private static double maxtime;
Expand All @@ -49,6 +52,7 @@ public class concoredocker {
params = new HashMap<>();
}
defaultMaxTime(100);
Runtime.getRuntime().addShutdownHook(new Thread(concoredocker::terminateZmq));
}

/**
Expand Down Expand Up @@ -292,7 +296,7 @@ public static void write(int port, String name, Object val, int delta) {
}
content.append("]");
// simtime must not be mutated here.
// Mutation breaks cross-language determinism (see issue #385).
// Mutation breaks cross-language determinism.
} else if (val instanceof Object[]) {
// Legacy support for Object[] arguments
Object[] arrayVal = (Object[]) val;
Expand All @@ -304,7 +308,7 @@ public static void write(int port, String name, Object val, int delta) {
}
content.append("]");
// simtime must not be mutated here.
// Mutation breaks cross-language determinism (see issue #385).
// Mutation breaks cross-language determinism.
} else {
System.out.println("write must have list or str");
return;
Expand Down Expand Up @@ -336,6 +340,113 @@ public static List<Object> initVal(String simtimeVal) {
return val;
}

private static ZMQ.Context getZmqContext() {
if (zmqContext == null) {
zmqContext = ZMQ.context(1);
}
return zmqContext;
}

public static void initZmqPort(String portName, String portType, String address, String socketTypeStr) {
if (zmqPorts.containsKey(portName)) return;
int sockType = zmqSocketTypeFromString(socketTypeStr);
if (sockType == -1) {
System.err.println("initZmqPort: unknown socket type '" + socketTypeStr + "'");
return;
}
zmqPorts.put(portName, new ZeroMQPort(portType, address, sockType));
}

public static void terminateZmq() {
for (Map.Entry<String, ZeroMQPort> entry : zmqPorts.entrySet()) {
entry.getValue().socket.close();
}
zmqPorts.clear();
if (zmqContext != null) {
zmqContext.term();
zmqContext = null;
}
}

private static int zmqSocketTypeFromString(String s) {
switch (s.toUpperCase()) {
case "REQ": return ZMQ.REQ;
case "REP": return ZMQ.REP;
case "PUB": return ZMQ.PUB;
case "SUB": return ZMQ.SUB;
case "PUSH": return ZMQ.PUSH;
case "PULL": return ZMQ.PULL;
case "PAIR": return ZMQ.PAIR;
default: return -1;
}
}

/**
* Reads data from a ZMQ port. Same wire format as file-based read:
* expects [simtime, val1, val2, ...], strips simtime, returns the rest.
*/
public static List<Object> read(String portName, String name, String initstr) {
List<Object> defaultVal = new ArrayList<>();
try {
List<?> parsed = (List<?>) literalEval(initstr);
if (parsed.size() > 1) {
defaultVal = new ArrayList<>(parsed.subList(1, parsed.size()));
}
} catch (Exception e) {
}
ZeroMQPort port = zmqPorts.get(portName);
if (port == null) {
System.err.println("read: ZMQ port '" + portName + "' not initialized");
return defaultVal;
}
String msg = port.recvWithRetry();
if (msg == null) {
System.err.println("read: ZMQ recv timeout on port '" + portName + "'");
return defaultVal;
}
s += msg;
try {
List<?> inval = (List<?>) literalEval(msg);
if (!inval.isEmpty()) {
simtime = Math.max(simtime, ((Number) inval.get(0)).doubleValue());
return new ArrayList<>(inval.subList(1, inval.size()));
}
} catch (Exception e) {
System.out.println("Error parsing ZMQ message '" + msg + "': " + e.getMessage());
}
return defaultVal;
}

/**
* Writes data to a ZMQ port. Prepends [simtime+delta] to match file-based write behavior.
*/
public static void write(String portName, String name, Object val, int delta) {
ZeroMQPort port = zmqPorts.get(portName);
if (port == null) {
System.err.println("write: ZMQ port '" + portName + "' not initialized");
return;
}
String payload;
if (val instanceof List) {
List<?> listVal = (List<?>) val;
StringBuilder sb = new StringBuilder("[");
sb.append(toPythonLiteral(simtime + delta));
for (Object o : listVal) {
sb.append(", ");
sb.append(toPythonLiteral(o));
}
sb.append("]");
payload = sb.toString();
// simtime must not be mutated here
} else if (val instanceof String) {
payload = (String) val;
} else {
System.out.println("write must have list or str");
return;
}
port.sendWithRetry(payload);
}

/**
* Parses a Python-literal string into Java objects using a recursive descent parser.
* Supports: dict, list, int, float, string (single/double quoted), bool, None, nested structures.
Expand All @@ -354,6 +465,44 @@ static Object literalEval(String s) {
return result;
}

/**
* ZMQ socket wrapper with bind/connect, timeouts, and retry.
*/
private static class ZeroMQPort {
final ZMQ.Socket socket;
final String address;

ZeroMQPort(String portType, String address, int socketType) {
this.address = address;
ZMQ.Context ctx = getZmqContext();
this.socket = ctx.socket(socketType);
this.socket.setReceiveTimeOut(2000);
this.socket.setSendTimeOut(2000);
this.socket.setLinger(0);
if (portType.equals("bind")) {
this.socket.bind(address);
} else {
this.socket.connect(address);
}
}

String recvWithRetry() {
for (int attempt = 0; attempt < 5; attempt++) {
String msg = socket.recvStr();
if (msg != null) return msg;
try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; }
}
return null;
}

void sendWithRetry(String message) {
for (int attempt = 0; attempt < 5; attempt++) {
if (socket.send(message)) return;
try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; }
}
}
}

/**
* Recursive descent parser for Python literal expressions.
* Handles: dicts, lists, tuples, strings, numbers, booleans, None.
Expand Down