diff --git a/docs/packetized_serial_transport.md b/docs/packetized_serial_transport.md
new file mode 100644
index 00000000..59166d6e
--- /dev/null
+++ b/docs/packetized_serial_transport.md
@@ -0,0 +1,275 @@
+# Packetized Serial Transport
+
+The packetized serial transport (`packetized_serial_transport.c`) is a framing layer that sits on top of the [generic serial transport](../zcm/transport/generic_serial_transport.h). It fragments large ZCM messages into bounded packet bodies that fit within the inner transport's MTU, and reassembles them on the receiver side. This makes it possible to send messages larger than the serial MTU over byte-stream links.
+
+Messages are only packetized when their ZCM channel name begins with `$`. Packetized inner messages are sent on that same `$`-prefixed channel, and the `$` is stripped when the reassembled message is delivered. All other messages are forwarded directly to the inner transport unchanged, subject to the inner transport MTU.
+
+---
+
+## Packet Wire Format
+
+Every packetized message sent over the inner transport has a fixed 4-byte header followed by a type-specific body. The `body_len` field is one byte, so a packetized body can be at most 255 bytes.
+
+The main wire-format constants are:
+
+- **`PACKETIZED_HEADER_BYTES` (4)** — the outer header present on every packet: `type` (1) + `session_id` (2) + `body_len` (1).
+- **`PACKETIZED_MAX_BODY_BYTES` (255)** — the maximum body length represented by the one-byte `body_len` field.
+- **`PACKETIZED_DATA_OVERHEAD_BYTES` (2)** — the non-payload prefix inside every DATA body: `packet_id` (2).
+- **`PACKETIZED_MAX_PACKET_DATA_SIZE` (253)** — the maximum DATA payload bytes per packet: `PACKETIZED_MAX_BODY_BYTES - PACKETIZED_DATA_OVERHEAD_BYTES`.
+- **`PACKETIZED_RETRANS_MAX_IDS` (127)** — the maximum packet IDs in one RETRANS_REQ body: `(255 - 1) / 2`.
+
+The maximum usable payload per DATA packet is therefore `min(inner_mtu - PACKETIZED_HEADER_BYTES - PACKETIZED_DATA_OVERHEAD_BYTES, PACKETIZED_MAX_PACKET_DATA_SIZE)`. A configured payload size may choose any smaller valid value.
+
+### Header (4 bytes, always present)
+
+| Offset | Size | Field | Description |
+|--------|------|--------------|------------------------------------------|
+| 0 | 1 | `type` | Message type (see below) |
+| 1–2 | 2 | `session_id` | Big-endian uint16; identifies a transfer |
+| 3 | 1 | `body_len` | Number of body bytes that follow |
+
+### METADATA body (9 bytes) — type = 1
+
+Sent once at the start of each transfer. Contains everything the receiver needs to allocate state and validate the reassembled message. The checksum is Fletcher-16 over the full original message, seeded with `PACKETIZED_FLETCHER16_INIT` (`0xFFFF`).
+
+| Offset | Size | Field | Description |
+|--------|------|----------------------|-------------------------------------------|
+| 0–1 | 2 | `total_packets` | Big-endian uint16; number of DATA packets |
+| 2 | 1 | `tx_packet_data_size`| Payload bytes per DATA packet |
+| 3–6 | 4 | `total_message_size` | Big-endian uint32; full message bytes |
+| 7–8 | 2 | `checksum` | Big-endian Fletcher-16 of the full message|
+
+### DATA body (2 + N bytes) — type = 2
+
+One packet per chunk of the message. `N` can be zero for an empty message; otherwise `N <= tx_packet_data_size`, and the last packet may be shorter than earlier packets.
+
+| Offset | Size | Field | Description |
+|--------|------|-------------|------------------------------------------|
+| 0–1 | 2 | `packet_id` | Big-endian uint16; zero-based index |
+| 2–... | N | `payload` | Slice of the original message |
+
+### RETRANS_REQ body (1 + 2·count bytes) — type = 3
+
+Sent by the receiver when it detects missing packets after a timeout (200 ms). The receiver splits requests so each RETRANS_REQ fits both the inner MTU and the 255-byte body limit. A zero `count` is accepted as a no-op, although the current receiver only emits requests with missing IDs.
+
+| Offset | Size | Field | Description |
+|--------|------|-----------|----------------------------------------------|
+| 0 | 1 | `count` | Number of packet IDs being requested |
+| 1–2 | 2 | `id[0]` | Big-endian uint16 packet_id |
+| 3–4 | 2 | `id[1]` | … |
+
+---
+
+## Validation Rules
+
+Incoming packetized messages are parsed only on `$`-prefixed channels. Packets shorter than `PACKETIZED_HEADER_BYTES`, packets whose `body_len` does not match the received length, and unknown packet types are ignored.
+
+METADATA packets are accepted only when:
+
+- `body_len == PACKETIZED_METADATA_BODY_BYTES`.
+- `total_packets > 0` and `packet_data_size > 0`.
+- `packet_data_size <= PACKETIZED_MAX_PACKET_DATA_SIZE`.
+- `total_message_size <= zt->mtu`.
+- `total_packets == ceil(total_message_size / packet_data_size)`, with zero-length messages represented by one DATA packet.
+- The receive buffers are already large enough, or dynamic growth succeeds.
+
+DATA packets for inactive sessions or mismatched session IDs are ignored. DATA packets for the active session are invalid if the packet ID is out of range, if the body is shorter than the packet ID field, or if the payload length does not exactly match the expected chunk size for that packet. Duplicate DATA packets refresh the session timestamp, but do not rewrite data or increment `received_count`.
+
+RETRANS_REQ packets for inactive sessions or mismatched session IDs are ignored. RETRANS_REQ bodies must be exactly `1 + 2 * count` bytes, and `count` must fit the transmitter's retransmission ID buffer. Requested packet IDs outside the active transmit session are skipped when retransmissions are sent.
+
+`ZCM_EINVALID` from packet processing is treated as a dropped bad packet; the transport continues processing later packets.
+
+## Data Structures
+
+### `packetized_rx_state_t`
+
+Tracks the state of one active receive session.
+
+| Field | Type | Purpose |
+|---------------------|------------|---------------------------------------------------------------|
+| `active` | `int` | Non-zero while a session is being reassembled |
+| `channel[]` | `char` | ZCM channel name for the in-progress message |
+| `session_id` | `uint16_t` | Session ID from METADATA; used to match DATA packets |
+| `total_packets` | `uint16_t` | Expected number of DATA packets |
+| `packet_data_size` | `uint8_t` | Payload bytes per packet (from METADATA) |
+| `total_message_size`| `uint32_t` | Full message size in bytes |
+| `expected_checksum` | `uint16_t` | Fletcher-16 checksum to validate after reassembly |
+| `last_update_utime` | `uint64_t` | Timestamp of the last received DATA packet (retrans trigger) |
+| `data` | `uint8_t*` | Reassembly buffer — payload slices are written here |
+| `packet_received[]` | `uint8_t*` | Per-packet arrival flag (1 = received, 0 = missing) |
+| `received_count` | `uint16_t` | Number of DATA packets received so far |
+| `retrans_pending` | `int` | A RETRANS_REQ needs to be sent on the next update |
+| `missing_ids[]` | `uint16_t*`| List of packet IDs not yet received, built at retrans time |
+| `missing_count` | `uint16_t` | Number of entries in `missing_ids` |
+| `packet_capacity` | `size_t` | Allocated capacity for packet tracking arrays; kept in step with message buffer bytes |
+
+### `packetized_tx_state_t`
+
+Tracks the state of one active transmit session; kept alive to service retransmission requests.
+
+| Field | Type | Purpose |
+|----------------------|------------|--------------------------------------------------------------|
+| `active` | `int` | Non-zero while the session is live (awaiting possible retrans)|
+| `channel[]` | `char` | ZCM channel name of the outgoing message |
+| `session_id` | `uint16_t` | Session ID for this send; generated by incrementing `next_session_id` |
+| `total_packets` | `uint16_t` | Total DATA packets sent |
+| `packet_data_size` | `uint8_t` | Payload bytes per packet used for this session |
+| `total_message_size` | `uint32_t` | Full message size in bytes |
+| `data` | `uint8_t*` | Copy of the sent message, held for retransmission |
+| `retrans_ids[]` | `uint16_t*`| Packet IDs requested for retransmission by the receiver |
+| `retrans_count` | `uint16_t` | Number of pending retransmission requests |
+| `packet_capacity` | `size_t` | Allocated capacity for retransmit IDs; kept in step with message buffer bytes |
+
+### `zcm_trans_packetized_serial_t`
+
+The top-level transport object. Owns both the TX and RX state plus all shared resources.
+
+| Field | Type | Purpose |
+|------------------------------|---------------------|-----------------------------------------------------------------|
+| `trans` | `zcm_trans_t` | The vtable pointer — satisfies the `zcm_trans_t` interface |
+| `inner` | `zcm_trans_t*` | The underlying generic serial transport |
+| `inner_mtu` | `size_t` | MTU of the inner transport; sets the maximum packet size |
+| `mtu` | `size_t` | Max message size this transport advertises to ZCM |
+| `max_message_size` | `size_t` | Current reassembly/transmit buffer capacity |
+| `dynamic` | `int` | If non-zero, buffers grow automatically to fit larger messages |
+| `configured_packet_data_size`| `uint8_t` | Payload bytes per DATA packet (auto-selected or user-specified) |
+| `pkt_buf` / `pkt_buf_size` | `uint8_t*` / `size_t` | Scratch buffer for building one outbound inner-transport packet|
+| `out_buf` / `out_buf_size` | `uint8_t*` / `size_t` | One-slot output queue for pass-through or reassembled messages |
+| `out_len` / `out_utime` | `size_t` / `uint64_t` | Length and timestamp of the queued message |
+| `out_channel[]` | `char` | Channel of the queued message; `$` prefix stripped after reassembly |
+| `out_pending` | `int` | Non-zero if `out_buf` holds an undelivered message |
+| `next_session_id` | `uint16_t` | Incremented for each packetized send; wraps naturally |
+| `rx` | `packetized_rx_state_t` | Current receive session state |
+| `tx` | `packetized_tx_state_t` | Current transmit session state |
+| `time` / `time_usr` | function pointer | Callback for current timestamp in microseconds |
+
+---
+
+## Flowcharts
+
+### Sending a Message
+
+```
+packetized_serial_sendmsg(msg)
+ │
+ ▼
+send_pending_retransmissions()
+send_retrans_request()
+ │
+ ▼
+ channel starts with '$'?
+ │ │
+ No Yes
+ │ │
+ ▼ ▼
+send to inner validate size and grow buffers if needed
+transport tx_packet_data_size = configured_packet_data_size
+(pass-through) total_packets = ceil(msg.len / tx_packet_data_size)
+ zero-length messages still use one DATA packet
+ │
+ ▼
+ copy msg to tx->data
+ save active session in tx state
+ │
+ ▼
+ Build METADATA packet:
+ [total_packets | tx_packet_data_size |
+ total_message_size | Fletcher-16 checksum]
+ Send via inner transport
+ │
+ ▼
+ For packet_id = 0 .. total_packets-1:
+ slice = msg.buf[packet_id * chunk .. +chunk]
+ Build DATA packet: [packet_id | slice]
+ Send via inner transport
+ │
+ ▼
+ return ZCM_EOK
+```
+
+### Receiving a Message
+
+```
+packetized_serial_recvmsg()
+ │
+ ▼
+ out_pending? ──Yes──► deliver queued message, return ZCM_EOK
+ │
+ No
+ │
+ ▼
+ process_incoming_messages()
+ │
+ ▼
+ zcm_trans_recvmsg(inner) ──EAGAIN──► maybe_schedule_retrans_request()
+ │ │
+ packet timeout elapsed &
+ received packets missing?
+ │ │
+ ▼ Yes
+ channel starts with '$'? │
+ │ │ ▼
+ No Yes build missing_ids list
+ │ │ set retrans_pending = 1
+ ▼ ▼ │
+ queue output Parse header: ▼
+ (pass-through) type / session_id / body_len send_retrans_request()
+ │ send RETRANS_REQ packet(s)
+ ┌───────────────┼────────────────┐
+ ▼ ▼ ▼
+ METADATA DATA RETRANS_REQ
+ │ │ │
+ ▼ ▼ ▼
+ begin_rx_session process_rx_data process_retrans_request
+ │ │ store retrans_ids in tx state
+ validate params validate exact │
+ grow buffers payload length ▼
+ if needed write new slice (retransmissions sent on
+ store session into rx->data next sendmsg or update_tx)
+ state │
+ mark if new
+ │
+ ▼
+ all packets received?
+ │ │
+ No Yes
+ │ │
+ │ ▼
+ │ validate Fletcher-16 checksum
+ │ pass? fail?
+ │ │ │
+ │ ▼ ▼
+ │ queue_output rx_clear
+ │ rx_clear return EINVALID
+ │ │
+ └───────┘
+ │
+ ▼
+ deliver message
+ return ZCM_EOK
+```
+
+---
+
+## Retransmission
+
+When the receiver stops getting DATA packets mid-transfer, it detects the stall by comparing the current time against `rx->last_update_utime`. If the gap exceeds `PACKETIZED_RETRANS_TIMEOUT_US` (200 ms), it:
+
+1. Builds the list of missing `packet_id` values into `rx->missing_ids`.
+2. Sets `rx->retrans_pending = 1`.
+3. On the next call to `send_retrans_request`, sends one or more RETRANS_REQ packets (splitting across multiple inner-MTU packets if there are many missing IDs).
+
+`send_retrans_request` splits large missing-ID lists by the smaller of the inner MTU limit and `PACKETIZED_RETRANS_MAX_IDS`. On the transmitter side, `process_retrans_request` stores the requested IDs in `tx->retrans_ids`. They are sent by `send_pending_retransmissions`, which is called at the start of every `sendmsg` and `update_tx`.
+
+---
+
+## Buffer Sizing and Dynamic Growth
+
+At creation time, `zcm_trans_packetized_serial_create` receives a `tx_packet_data_size` argument (0 = auto-select the largest value that fits in the inner MTU) and a `max_message_size` argument (0 = dynamic).
+
+- The inner MTU must fit at least one DATA packet with one payload byte and one METADATA packet.
+- If `tx_packet_data_size == 0`, the transport chooses the largest DATA payload that fits the inner MTU, capped at `PACKETIZED_MAX_PACKET_DATA_SIZE`.
+- If `tx_packet_data_size > 0`, it must be no larger than `PACKETIZED_MAX_PACKET_DATA_SIZE` and `PACKETIZED_HEADER_BYTES + PACKETIZED_DATA_OVERHEAD_BYTES + tx_packet_data_size` must fit within the inner MTU.
+- The advertised packetized MTU is `configured_packet_data_size * PACKETIZED_MAX_PACKETS`, capped to `max_message_size` in fixed mode.
+- In **fixed** mode (`max_message_size > 0`), buffers are sized once at creation and packetized messages larger than `max_message_size` are rejected with `ZCM_EINVALID`.
+- In **dynamic** mode (`max_message_size == 0`), buffers start at `PACKETIZED_DEFAULT_MAX_MESSAGE_SIZE` and `grow_buffers()` is called whenever an incoming METADATA packet or outgoing message exceeds the current allocation. The RX data, RX packet flags, RX missing IDs, TX data, TX retransmit IDs, and output buffer are grown as needed. If growth fails, the operation returns `ZCM_EAGAIN`.
diff --git a/docs/transports.md b/docs/transports.md
index 4333c3cb..dc59293c 100644
--- a/docs/transports.md
+++ b/docs/transports.md
@@ -48,13 +48,13 @@ be used to *summon* the transport:
| Serial |
- serial://<path-to-device>?baud=<baud> |
- zcm_create("serial:///dev/ttyUSB0?baud=115200") |
+ serial://<path-to-device>?baud=<baud>[&pkt_size=<n>] |
+ zcm_create("serial:///dev/ttyUSB0?baud=115200"), zcm_create("serial:///dev/ttyUSB0?baud=115200&pkt_size=128") |
| CAN |
- can://<interface>?msgid=<id> |
- zcm_create("can://can0?msgid=65536") |
+ can://<interface>?msgid=<id>[&pkt_size=<n>] |
+ zcm_create("can://can0?msgid=65536"), zcm_create("can://can0?msgid=65536&pkt_size=32") |
| Inter-process via Shared Memory (IPCSHM) |
diff --git a/test/run-tests.sh b/test/run-tests.sh
index b33ca03e..3a644a5d 100755
--- a/test/run-tests.sh
+++ b/test/run-tests.sh
@@ -38,7 +38,7 @@ echo "**********************************"
$ROOTDIR/build/$BLD/test/runner
echo "Success"
-if [ -n "$1" ]; then
+if [ -n "${1:-}" ]; then
echo "Skipping non c/c++ lanugage tests in sanitizer mode"
exit 0
fi
diff --git a/test/zcm/PacketizedSerialTransportTest.hpp b/test/zcm/PacketizedSerialTransportTest.hpp
new file mode 100644
index 00000000..1fe127ea
--- /dev/null
+++ b/test/zcm/PacketizedSerialTransportTest.hpp
@@ -0,0 +1,544 @@
+#ifndef PACKETIZED_SERIAL_TRANSPORT_TEST_HPP
+#define PACKETIZED_SERIAL_TRANSPORT_TEST_HPP
+
+#include "cxxtest/TestSuite.h"
+#include "zcm/transport/packetized_serial_transport.h"
+#include "zcm/transport/packetized_serial_protocol.h"
+#include "zcm/zcm_coretypes.h"
+
+#include
+#include
+#include
+#include
+
+using std::string;
+using std::vector;
+
+struct PacketizedLinkEndpoint
+{
+ vector rx;
+ vector txParse;
+ PacketizedLinkEndpoint* peer{ nullptr };
+
+ bool dropEnabled{ false };
+ bool dropDone{ false };
+ uint16_t dropPacketId{ 0 };
+ string dropChannel;
+};
+
+// decode_frame parses one generic_serial framing layer frame from buf[start..].
+// The outer frame format is: 0xCC 0x00 chan_len data_len(4B) *chan *data checksum(2B),
+// with 0xCC bytes in chan/data escaped as 0xCC 0xCC.
+// NOTE: This mirrors the private framing in generic_serial_transport.c. If that
+// framing changes, this function must be updated accordingly.
+static bool decode_frame(const vector& buf, size_t start, size_t& frame_end,
+ string& channel, vector& data)
+{
+ if (buf.size() < start + 7) return false;
+ if (buf[start] != 0xcc || buf[start + 1] != 0x00) return false;
+
+ size_t cur = start + 2;
+ uint8_t chan_len = buf[cur++];
+ uint32_t data_len = ((uint32_t)buf[cur] << 24) | ((uint32_t)buf[cur + 1] << 16) |
+ ((uint32_t)buf[cur + 2] << 8) | (uint32_t)buf[cur + 3];
+ cur += 4;
+
+ channel.clear();
+ channel.reserve(chan_len);
+ for (uint8_t i = 0; i < chan_len; ++i) {
+ if (cur >= buf.size()) return false;
+ uint8_t c = buf[cur++];
+ if (c == 0xcc) {
+ if (cur >= buf.size()) return false;
+ uint8_t c2 = buf[cur++];
+ if (c2 != 0xcc) return false;
+ c = c2;
+ }
+ channel.push_back((char)c);
+ }
+
+ data.clear();
+ data.reserve(data_len);
+ for (uint32_t i = 0; i < data_len; ++i) {
+ if (cur >= buf.size()) return false;
+ uint8_t c = buf[cur++];
+ if (c == 0xcc) {
+ if (cur >= buf.size()) return false;
+ uint8_t c2 = buf[cur++];
+ if (c2 != 0xcc) return false;
+ c = c2;
+ }
+ data.push_back(c);
+ }
+
+ if (cur + 2 > buf.size()) return false;
+ frame_end = cur + 2;
+ return true;
+}
+
+static size_t endpoint_get(uint8_t* data, size_t nData, void* usr)
+{
+ auto* ep = (PacketizedLinkEndpoint*)usr;
+ size_t n = ep->rx.size() < nData ? ep->rx.size() : nData;
+ if (n == 0) return 0;
+ memcpy(data, ep->rx.data(), n);
+ ep->rx.erase(ep->rx.begin(), ep->rx.begin() + (ptrdiff_t)n);
+ return n;
+}
+
+static size_t endpoint_put(const uint8_t* data, size_t nData, void* usr)
+{
+ auto* ep = (PacketizedLinkEndpoint*)usr;
+ if (nData == 0) return 0;
+
+ ep->txParse.insert(ep->txParse.end(), data, data + nData);
+
+ size_t consumed = 0;
+ while (consumed < ep->txParse.size()) {
+ if (ep->txParse[consumed] != 0xcc) {
+ ++consumed;
+ continue;
+ }
+
+ string channel;
+ vector payload;
+ size_t frame_end = 0;
+ if (!decode_frame(ep->txParse, consumed, frame_end, channel, payload)) break;
+
+ bool drop = false;
+ if (ep->dropEnabled && !ep->dropDone && channel == ep->dropChannel &&
+ payload.size() >= PACKETIZED_HEADER_BYTES + PACKETIZED_DATA_OVERHEAD_BYTES) {
+ uint8_t type = payload[0];
+ uint8_t body_len = payload[3];
+ if (type == PACKETIZED_MSG_DATA &&
+ payload.size() == (size_t)PACKETIZED_HEADER_BYTES + body_len &&
+ body_len >= PACKETIZED_DATA_OVERHEAD_BYTES) {
+ uint16_t packet_id = 0;
+ __int16_t_decode_array(payload.data(), PACKETIZED_HEADER_BYTES, 2, (int16_t*)(&packet_id), 1);
+ if (packet_id == ep->dropPacketId) {
+ drop = true;
+ ep->dropDone = true;
+ }
+ }
+ }
+
+ if (!drop) {
+ ep->peer->rx.insert(ep->peer->rx.end(),
+ ep->txParse.begin() + (ptrdiff_t)consumed,
+ ep->txParse.begin() + (ptrdiff_t)frame_end);
+ }
+
+ consumed = frame_end;
+ }
+
+ if (consumed > 0) {
+ ep->txParse.erase(ep->txParse.begin(), ep->txParse.begin() + (ptrdiff_t)consumed);
+ }
+
+ return nData;
+}
+
+static uint64_t fake_now(void* usr) { return *(uint64_t*)usr; }
+
+static void pump(zcm_trans_t* tx, zcm_trans_t* rx, int iters)
+{
+ for (int i = 0; i < iters; ++i) {
+ TS_ASSERT_EQUALS(packetized_serial_update_tx(tx), ZCM_EOK);
+ TS_ASSERT_EQUALS(packetized_serial_update_rx(rx), ZCM_EOK);
+ }
+}
+
+// pump via zcm_trans_update (the vtable path used by TransportSerial/TransportCan wrappers)
+static void pumpViaUpdate(zcm_trans_t* tx, zcm_trans_t* rx, int iters)
+{
+ for (int i = 0; i < iters; ++i) {
+ TS_ASSERT_EQUALS(zcm_trans_update(tx), ZCM_EOK);
+ TS_ASSERT_EQUALS(zcm_trans_update(rx), ZCM_EOK);
+ }
+}
+
+class PacketizedSerialTransportTest : public CxxTest::TestSuite
+{
+ public:
+ void testPacketizedRoundTrip()
+ {
+ PacketizedLinkEndpoint a;
+ PacketizedLinkEndpoint b;
+ a.peer = &b;
+ b.peer = &a;
+
+ uint64_t now = 1000;
+ zcm_trans_t* tx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &a, fake_now, &now, 64, 32768, 0, 1024);
+ zcm_trans_t* rx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &b, fake_now, &now, 64, 32768, 0, 1024);
+ TSM_ASSERT("failed creating transports", tx && rx);
+
+ vector payload(512);
+ for (size_t i = 0; i < payload.size(); ++i) payload[i] = (uint8_t)(i ^ 0x5a);
+
+ zcm_msg_t msg;
+ msg.utime = now;
+ msg.channel = (char*)"$BIG";
+ msg.len = payload.size();
+ msg.buf = payload.data();
+
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(tx, msg), ZCM_EOK);
+ pump(tx, rx, 20);
+
+ zcm_msg_t out;
+ TS_ASSERT_EQUALS(zcm_trans_recvmsg(rx, &out, 0), ZCM_EOK);
+ TS_ASSERT_EQUALS(string(out.channel), string("BIG"));
+ TS_ASSERT_EQUALS(out.len, payload.size());
+ TS_ASSERT_SAME_DATA(out.buf, payload.data(), payload.size());
+
+ zcm_trans_destroy(tx);
+ zcm_trans_destroy(rx);
+ }
+
+ void testRetransmissionRequestPath()
+ {
+ PacketizedLinkEndpoint a;
+ PacketizedLinkEndpoint b;
+ a.peer = &b;
+ b.peer = &a;
+
+ a.dropEnabled = true;
+ a.dropPacketId = 1;
+ a.dropChannel = "$RETX";
+
+ uint64_t now = 2000;
+ zcm_trans_t* ta = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &a, fake_now, &now, 64, 32768, 0, 1024);
+ zcm_trans_t* tb = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &b, fake_now, &now, 64, 32768, 0, 1024);
+ TSM_ASSERT("failed creating transports", ta && tb);
+
+ vector payload(700);
+ for (size_t i = 0; i < payload.size(); ++i) payload[i] = (uint8_t)(i + 11);
+
+ zcm_msg_t m;
+ m.utime = now;
+ m.channel = (char*)"$RETX";
+ m.len = payload.size();
+ m.buf = payload.data();
+
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(ta, m), ZCM_EOK);
+ pump(ta, tb, 25);
+
+ zcm_msg_t out;
+ TS_ASSERT_EQUALS(zcm_trans_recvmsg(tb, &out, 0), ZCM_EAGAIN);
+
+ now += 300000;
+ TS_ASSERT_EQUALS(zcm_trans_recvmsg(tb, &out, 0), ZCM_EAGAIN);
+
+ pump(tb, ta, 10);
+ pump(ta, tb, 20);
+
+ bool gotRetx = false;
+ for (int i = 0; i < 6; ++i) {
+ int ret = zcm_trans_recvmsg(tb, &out, 0);
+ if (ret != ZCM_EOK) continue;
+ if (string(out.channel) == "RETX") {
+ gotRetx = true;
+ TS_ASSERT_EQUALS(out.len, payload.size());
+ TS_ASSERT_SAME_DATA(out.buf, payload.data(), payload.size());
+ break;
+ }
+ }
+ TS_ASSERT(gotRetx);
+ TS_ASSERT(a.dropDone);
+
+ zcm_trans_destroy(ta);
+ zcm_trans_destroy(tb);
+ }
+
+ void testConfiguredMaxMessageSize()
+ {
+ PacketizedLinkEndpoint a;
+ PacketizedLinkEndpoint b;
+ a.peer = &b;
+ b.peer = &a;
+
+ uint64_t now = 3000;
+ zcm_trans_t* tx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &a, fake_now, &now, 64, 32768, 0, 128);
+ zcm_trans_t* rx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &b, fake_now, &now, 64, 32768, 0, 128);
+ TSM_ASSERT("failed creating transports", tx && rx);
+
+ vector smallPayload(128, 0x5a);
+ zcm_msg_t small;
+ small.utime = now;
+ small.channel = (char*)"$SMALL";
+ small.len = smallPayload.size();
+ small.buf = smallPayload.data();
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(tx, small), ZCM_EOK);
+
+ vector largePayload(129, 0x6b);
+ zcm_msg_t large;
+ large.utime = now;
+ large.channel = (char*)"$LARGE";
+ large.len = largePayload.size();
+ large.buf = largePayload.data();
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(tx, large), ZCM_EINVALID);
+
+ zcm_trans_destroy(tx);
+ zcm_trans_destroy(rx);
+ }
+
+ void testAsymmetricPacketSizesRoundTrip()
+ {
+ PacketizedLinkEndpoint a;
+ PacketizedLinkEndpoint b;
+ a.peer = &b;
+ b.peer = &a;
+
+ uint64_t now = 4000;
+ zcm_trans_t* tx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &a, fake_now, &now, 64, 32768, 8, 1024);
+ zcm_trans_t* rx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &b, fake_now, &now, 64, 32768, 32, 1024);
+ TSM_ASSERT("failed creating transports", tx && rx);
+
+ vector payload(512);
+ for (size_t i = 0; i < payload.size(); ++i) payload[i] = (uint8_t)(i ^ 0x33);
+
+ zcm_msg_t msg;
+ msg.utime = now;
+ msg.channel = (char*)"$ASYM";
+ msg.len = payload.size();
+ msg.buf = payload.data();
+
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(tx, msg), ZCM_EOK);
+ pump(tx, rx, 40);
+
+ zcm_msg_t out;
+ TS_ASSERT_EQUALS(zcm_trans_recvmsg(rx, &out, 0), ZCM_EOK);
+ TS_ASSERT_EQUALS(string(out.channel), string("ASYM"));
+ TS_ASSERT_EQUALS(out.len, payload.size());
+ TS_ASSERT_SAME_DATA(out.buf, payload.data(), payload.size());
+
+ zcm_trans_destroy(tx);
+ zcm_trans_destroy(rx);
+ }
+
+ void testAsymmetricPacketSizesReverseRoundTrip()
+ {
+ PacketizedLinkEndpoint a;
+ PacketizedLinkEndpoint b;
+ a.peer = &b;
+ b.peer = &a;
+
+ uint64_t now = 5000;
+ zcm_trans_t* tx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &a, fake_now, &now, 64, 32768, 32, 1024);
+ zcm_trans_t* rx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &b, fake_now, &now, 64, 32768, 8, 1024);
+ TSM_ASSERT("failed creating transports", tx && rx);
+
+ vector payload(512);
+ for (size_t i = 0; i < payload.size(); ++i) payload[i] = (uint8_t)(i ^ 0x77);
+
+ zcm_msg_t msg;
+ msg.utime = now;
+ msg.channel = (char*)"$ASYM_REV";
+ msg.len = payload.size();
+ msg.buf = payload.data();
+
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(tx, msg), ZCM_EOK);
+ pump(tx, rx, 40);
+
+ zcm_msg_t out;
+ TS_ASSERT_EQUALS(zcm_trans_recvmsg(rx, &out, 0), ZCM_EOK);
+ TS_ASSERT_EQUALS(string(out.channel), string("ASYM_REV"));
+ TS_ASSERT_EQUALS(out.len, payload.size());
+ TS_ASSERT_SAME_DATA(out.buf, payload.data(), payload.size());
+
+ zcm_trans_destroy(tx);
+ zcm_trans_destroy(rx);
+ }
+
+ void testNonPacketizedPassthroughIgnoresPacketizedSizeLimit()
+ {
+ PacketizedLinkEndpoint a;
+ PacketizedLinkEndpoint b;
+ a.peer = &b;
+ b.peer = &a;
+
+ uint64_t now = 5500;
+ zcm_trans_t* tx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &a, fake_now, &now, 2048, 32768, 8, 128);
+ zcm_trans_t* rx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &b, fake_now, &now, 2048, 32768, 8, 128);
+ TSM_ASSERT("failed creating transports", tx && rx);
+
+ vector payload(512);
+ for (size_t i = 0; i < payload.size(); ++i) payload[i] = (uint8_t)(i ^ 0x19);
+
+ zcm_msg_t msg;
+ msg.utime = now;
+ msg.channel = (char*)"PLAIN";
+ msg.len = payload.size();
+ msg.buf = payload.data();
+
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(tx, msg), ZCM_EOK);
+ pump(tx, rx, 20);
+
+ zcm_msg_t out;
+ TS_ASSERT_EQUALS(zcm_trans_recvmsg(rx, &out, 0), ZCM_EOK);
+ TS_ASSERT_EQUALS(string(out.channel), string("PLAIN"));
+ TS_ASSERT_EQUALS(out.len, payload.size());
+ TS_ASSERT_SAME_DATA(out.buf, payload.data(), payload.size());
+
+ zcm_trans_destroy(tx);
+ zcm_trans_destroy(rx);
+ }
+
+ void testPacketizedMessageStillRespectsConfiguredSizeLimit()
+ {
+ PacketizedLinkEndpoint a;
+ PacketizedLinkEndpoint b;
+ a.peer = &b;
+ b.peer = &a;
+
+ uint64_t now = 5600;
+ zcm_trans_t* tx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &a, fake_now, &now, 2048, 32768, 8, 128);
+ zcm_trans_t* rx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &b, fake_now, &now, 2048, 32768, 8, 128);
+ TSM_ASSERT("failed creating transports", tx && rx);
+
+ vector payload(129, 0x2a);
+ zcm_msg_t msg;
+ msg.utime = now;
+ msg.channel = (char*)"$TOO_BIG";
+ msg.len = payload.size();
+ msg.buf = payload.data();
+
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(tx, msg), ZCM_EINVALID);
+
+ zcm_trans_destroy(tx);
+ zcm_trans_destroy(rx);
+ }
+
+ // Passing max_message_size=0 enables dynamic mode. Buffers start at
+ // PACKETIZED_DEFAULT_MAX_MESSAGE_SIZE (1024) and grow to fit any message.
+ void testDynamicResizingRoundTrip()
+ {
+ PacketizedLinkEndpoint a;
+ PacketizedLinkEndpoint b;
+ a.peer = &b;
+ b.peer = &a;
+
+ uint64_t now = 7000;
+ zcm_trans_t* tx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &a, fake_now, &now, 64, 32768, 0, 0);
+ zcm_trans_t* rx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &b, fake_now, &now, 64, 32768, 0, 0);
+ TSM_ASSERT("failed creating transports", tx && rx);
+
+ // 2048 bytes exceeds PACKETIZED_DEFAULT_MAX_MESSAGE_SIZE (1024)
+ vector payload(2048);
+ for (size_t i = 0; i < payload.size(); ++i) payload[i] = (uint8_t)(i ^ 0xd5);
+
+ zcm_msg_t msg;
+ msg.utime = now;
+ msg.channel = (char*)"$DYN";
+ msg.len = payload.size();
+ msg.buf = payload.data();
+
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(tx, msg), ZCM_EOK);
+ pump(tx, rx, 40);
+
+ zcm_msg_t out;
+ TS_ASSERT_EQUALS(zcm_trans_recvmsg(rx, &out, 0), ZCM_EOK);
+ TS_ASSERT_EQUALS(string(out.channel), string("DYN"));
+ TS_ASSERT_EQUALS(out.len, payload.size());
+ TS_ASSERT_SAME_DATA(out.buf, payload.data(), payload.size());
+
+ zcm_trans_destroy(tx);
+ zcm_trans_destroy(rx);
+ }
+
+ void testDynamicResizingMultipleGrowths()
+ {
+ PacketizedLinkEndpoint a;
+ PacketizedLinkEndpoint b;
+ a.peer = &b;
+ b.peer = &a;
+
+ uint64_t now = 7100;
+ zcm_trans_t* tx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &a, fake_now, &now, 64, 32768, 0, 0);
+ zcm_trans_t* rx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &b, fake_now, &now, 64, 32768, 0, 0);
+ TSM_ASSERT("failed creating transports", tx && rx);
+
+ static const size_t sizes[] = { 512, 2048, 8192 };
+ for (size_t si = 0; si < 3; ++si) {
+ vector payload(sizes[si]);
+ for (size_t i = 0; i < payload.size(); ++i) payload[i] = (uint8_t)(i ^ (uint8_t)si);
+
+ zcm_msg_t msg;
+ msg.utime = now;
+ msg.channel = (char*)"$GROW";
+ msg.len = payload.size();
+ msg.buf = payload.data();
+
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(tx, msg), ZCM_EOK);
+ pump(tx, rx, 200);
+
+ zcm_msg_t out;
+ TS_ASSERT_EQUALS(zcm_trans_recvmsg(rx, &out, 0), ZCM_EOK);
+ TS_ASSERT_EQUALS(string(out.channel), string("GROW"));
+ TS_ASSERT_EQUALS(out.len, payload.size());
+ TS_ASSERT_SAME_DATA(out.buf, payload.data(), payload.size());
+ }
+
+ zcm_trans_destroy(tx);
+ zcm_trans_destroy(rx);
+ }
+
+ // Drive the transport via zcm_trans_update (the vtable update path) rather than the
+ // explicit packetized_serial_update_rx/tx functions. This mirrors how the
+ // TransportSerial and TransportCan wrappers operate and would have caught the
+ // assertion failure caused by calling serial_update_rx/tx on a packetized transport.
+ void testRoundTripViaVtableUpdate()
+ {
+ PacketizedLinkEndpoint a;
+ PacketizedLinkEndpoint b;
+ a.peer = &b;
+ b.peer = &a;
+
+ uint64_t now = 6000;
+ zcm_trans_t* tx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &a, fake_now, &now, 64, 32768, 8, 1024);
+ zcm_trans_t* rx = zcm_trans_packetized_serial_create(
+ endpoint_get, endpoint_put, &b, fake_now, &now, 64, 32768, 8, 1024);
+ TSM_ASSERT("failed creating transports", tx && rx);
+
+ vector payload(256);
+ for (size_t i = 0; i < payload.size(); ++i) payload[i] = (uint8_t)(i ^ 0xab);
+
+ zcm_msg_t msg;
+ msg.utime = now;
+ msg.channel = (char*)"$VTABLE";
+ msg.len = payload.size();
+ msg.buf = payload.data();
+
+ TS_ASSERT_EQUALS(zcm_trans_sendmsg(tx, msg), ZCM_EOK);
+ pumpViaUpdate(tx, rx, 40);
+
+ zcm_msg_t out;
+ TS_ASSERT_EQUALS(zcm_trans_recvmsg(rx, &out, 0), ZCM_EOK);
+ TS_ASSERT_EQUALS(string(out.channel), string("VTABLE"));
+ TS_ASSERT_EQUALS(out.len, payload.size());
+ TS_ASSERT_SAME_DATA(out.buf, payload.data(), payload.size());
+
+ zcm_trans_destroy(tx);
+ zcm_trans_destroy(rx);
+ }
+};
+
+#endif
diff --git a/test/zcm/wscript b/test/zcm/wscript
index c0c6f672..fb958233 100644
--- a/test/zcm/wscript
+++ b/test/zcm/wscript
@@ -7,3 +7,12 @@ def build(ctx):
source = 'multi_file.cpp',
rpath = ctx.env.RPATH_zcm,
install_path = None)
+
+ ctx.stlib(target = 'packetized_test_support',
+ use = 'default',
+ source = [
+ '../../zcm/transport/packetized_serial_transport.c',
+ '../../zcm/transport/generic_serial_transport.c',
+ '../../zcm/transport/generic_serial_circ_buff.c',
+ ],
+ install_path = None)
diff --git a/wscript b/wscript
index 1c97d910..6af0c9c4 100644
--- a/wscript
+++ b/wscript
@@ -482,7 +482,7 @@ def build(ctx):
if ctx.env.USING_CXXTEST:
ctx.cxxtest(use = ['zcm', 'zcm_tools_util',
'testzcmtypes', 'testzcmtypes_cpp', 'testzcmtypes_c_stlib',
- 'multifile_lib'])
+ 'multifile_lib', 'packetized_test_support'])
else:
ctx.recurse('scripts')
ctx.recurse('zcm')
diff --git a/zcm/transport/generic_serial_fletcher.h b/zcm/transport/generic_serial_fletcher.h
index f06e7de8..321dcd0d 100644
--- a/zcm/transport/generic_serial_fletcher.h
+++ b/zcm/transport/generic_serial_fletcher.h
@@ -19,4 +19,11 @@ static inline uint16_t fletcherUpdate(uint8_t b, uint16_t prevSum)
return (sumHigh << 8) | sumLow;
}
+static inline uint16_t fletcher16(const uint8_t* data, size_t len, uint16_t prevSum)
+{
+ size_t i;
+ for (i = 0; i < len; ++i) prevSum = fletcherUpdate(data[i], prevSum);
+ return prevSum;
+}
+
#endif /* _ZCM_TRANS_NONBLOCKING_FLETCHER_H */
diff --git a/zcm/transport/packetized_serial_protocol.h b/zcm/transport/packetized_serial_protocol.h
new file mode 100644
index 00000000..1b67e72b
--- /dev/null
+++ b/zcm/transport/packetized_serial_protocol.h
@@ -0,0 +1,78 @@
+#ifndef _ZCM_TRANS_PACKETIZED_SERIAL_PROTOCOL_H
+#define _ZCM_TRANS_PACKETIZED_SERIAL_PROTOCOL_H
+
+#include
+#include
+
+#include "generic_serial_fletcher.h"
+
+#define PACKETIZED_U8_BYTES (1)
+#define PACKETIZED_U16_BYTES (2)
+#define PACKETIZED_U32_BYTES (4)
+#define PACKETIZED_MAX_BODY_BYTES ((size_t)UINT8_MAX)
+
+#define PACKETIZED_HEADER_TYPE_OFFSET (0)
+#define PACKETIZED_HEADER_TYPE_BYTES (PACKETIZED_U8_BYTES)
+#define PACKETIZED_HEADER_SESSION_ID_OFFSET \
+ (PACKETIZED_HEADER_TYPE_OFFSET + PACKETIZED_HEADER_TYPE_BYTES)
+#define PACKETIZED_HEADER_SESSION_ID_BYTES (PACKETIZED_U16_BYTES)
+#define PACKETIZED_HEADER_BODY_LEN_OFFSET \
+ (PACKETIZED_HEADER_SESSION_ID_OFFSET + PACKETIZED_HEADER_SESSION_ID_BYTES)
+#define PACKETIZED_HEADER_BODY_LEN_BYTES (PACKETIZED_U8_BYTES)
+#define PACKETIZED_HEADER_BYTES \
+ (PACKETIZED_HEADER_BODY_LEN_OFFSET + PACKETIZED_HEADER_BODY_LEN_BYTES)
+
+#define PACKETIZED_DATA_PACKET_ID_OFFSET (0)
+#define PACKETIZED_DATA_PACKET_ID_BYTES (PACKETIZED_U16_BYTES)
+#define PACKETIZED_DATA_PAYLOAD_OFFSET \
+ (PACKETIZED_DATA_PACKET_ID_OFFSET + PACKETIZED_DATA_PACKET_ID_BYTES)
+#define PACKETIZED_DATA_OVERHEAD_BYTES (PACKETIZED_DATA_PAYLOAD_OFFSET)
+#define PACKETIZED_MIN_DATA_PAYLOAD_BYTES (1)
+#define PACKETIZED_MAX_PACKET_DATA_SIZE \
+ (PACKETIZED_MAX_BODY_BYTES - PACKETIZED_DATA_OVERHEAD_BYTES)
+
+#define PACKETIZED_METADATA_TOTAL_PACKETS_OFFSET (0)
+#define PACKETIZED_METADATA_TOTAL_PACKETS_BYTES (PACKETIZED_U16_BYTES)
+#define PACKETIZED_METADATA_PACKET_DATA_SIZE_OFFSET \
+ (PACKETIZED_METADATA_TOTAL_PACKETS_OFFSET + PACKETIZED_METADATA_TOTAL_PACKETS_BYTES)
+#define PACKETIZED_METADATA_PACKET_DATA_SIZE_BYTES (PACKETIZED_U8_BYTES)
+#define PACKETIZED_METADATA_TOTAL_MESSAGE_SIZE_OFFSET \
+ (PACKETIZED_METADATA_PACKET_DATA_SIZE_OFFSET + PACKETIZED_METADATA_PACKET_DATA_SIZE_BYTES)
+#define PACKETIZED_METADATA_TOTAL_MESSAGE_SIZE_BYTES (PACKETIZED_U32_BYTES)
+#define PACKETIZED_METADATA_CHECKSUM_OFFSET \
+ (PACKETIZED_METADATA_TOTAL_MESSAGE_SIZE_OFFSET + PACKETIZED_METADATA_TOTAL_MESSAGE_SIZE_BYTES)
+#define PACKETIZED_METADATA_CHECKSUM_BYTES (PACKETIZED_U16_BYTES)
+#define PACKETIZED_METADATA_BODY_BYTES \
+ (PACKETIZED_METADATA_CHECKSUM_OFFSET + PACKETIZED_METADATA_CHECKSUM_BYTES)
+
+#define PACKETIZED_RETRANS_COUNT_OFFSET (0)
+#define PACKETIZED_RETRANS_COUNT_BYTES (PACKETIZED_U8_BYTES)
+#define PACKETIZED_RETRANS_IDS_OFFSET \
+ (PACKETIZED_RETRANS_COUNT_OFFSET + PACKETIZED_RETRANS_COUNT_BYTES)
+#define PACKETIZED_RETRANS_ID_BYTES (PACKETIZED_U16_BYTES)
+#define PACKETIZED_RETRANS_BODY_BYTES(count) \
+ (PACKETIZED_RETRANS_IDS_OFFSET + ((size_t)(count) * PACKETIZED_RETRANS_ID_BYTES))
+#define PACKETIZED_RETRANS_MAX_IDS \
+ ((PACKETIZED_MAX_BODY_BYTES - PACKETIZED_RETRANS_IDS_OFFSET) / PACKETIZED_RETRANS_ID_BYTES)
+
+#define PACKETIZED_RETRANS_TIMEOUT_US (200000)
+#define PACKETIZED_MAX_PACKETS ((size_t)UINT16_MAX)
+#define PACKETIZED_DEFAULT_MAX_MESSAGE_SIZE (1024)
+#define PACKETIZED_FLETCHER16_INIT (0xFFFFu)
+
+typedef enum packetized_msg_type_t
+{
+ PACKETIZED_MSG_METADATA = 1,
+ PACKETIZED_MSG_DATA = 2,
+ PACKETIZED_MSG_RETRANS_REQ = 3,
+} packetized_msg_type_t;
+
+static inline size_t packetized_message_packet_count(size_t message_size,
+ uint8_t packet_data_size)
+{
+ if (packet_data_size == 0) return 0;
+ if (message_size == 0) return 1;
+ return (message_size + packet_data_size - 1u) / packet_data_size;
+}
+
+#endif /* _ZCM_TRANS_PACKETIZED_SERIAL_PROTOCOL_H */
diff --git a/zcm/transport/packetized_serial_transport.c b/zcm/transport/packetized_serial_transport.c
new file mode 100644
index 00000000..27aca156
--- /dev/null
+++ b/zcm/transport/packetized_serial_transport.c
@@ -0,0 +1,789 @@
+#include "zcm/transport.h"
+#include "zcm/zcm.h"
+#include "zcm/zcm_coretypes.h"
+
+#include "generic_serial_transport.h"
+#include "packetized_serial_protocol.h"
+#include "packetized_serial_transport.h"
+
+#include
+#include
+#include
+#include
+
+#define PACKETIZED_CHANNEL_PREFIX '$'
+#define PACKETIZED_SEND_RETRY_COUNT (4)
+
+typedef struct packetized_rx_state_t packetized_rx_state_t;
+struct packetized_rx_state_t
+{
+ int active;
+ char channel[ZCM_CHANNEL_MAXLEN + 1];
+ uint16_t session_id;
+ uint16_t total_packets;
+ uint8_t packet_data_size;
+ uint32_t total_message_size;
+ uint16_t expected_checksum;
+ uint64_t last_update_utime;
+
+ uint8_t* data;
+ uint8_t* packet_received;
+ uint16_t received_count;
+
+ int retrans_pending;
+ uint16_t* missing_ids;
+ uint16_t missing_count;
+ size_t packet_capacity;
+};
+
+typedef struct packetized_tx_state_t packetized_tx_state_t;
+struct packetized_tx_state_t
+{
+ int active;
+ char channel[ZCM_CHANNEL_MAXLEN + 1];
+ uint16_t session_id;
+ uint16_t total_packets;
+ uint8_t packet_data_size;
+ uint32_t total_message_size;
+ uint8_t* data;
+
+ uint16_t* retrans_ids;
+ uint16_t retrans_count;
+ size_t packet_capacity;
+};
+
+typedef struct zcm_trans_packetized_serial_t zcm_trans_packetized_serial_t;
+struct zcm_trans_packetized_serial_t
+{
+ zcm_trans_t trans;
+ zcm_trans_t* inner;
+
+ size_t inner_mtu;
+ size_t mtu;
+ size_t max_message_size;
+ int dynamic;
+ uint8_t configured_packet_data_size;
+
+ uint8_t* pkt_buf;
+ size_t pkt_buf_size;
+
+ uint8_t* out_buf;
+ size_t out_buf_size;
+ size_t out_len;
+ uint64_t out_utime;
+ char out_channel[ZCM_CHANNEL_MAXLEN + 1];
+ int out_pending;
+
+ uint16_t next_session_id;
+ packetized_rx_state_t rx;
+ packetized_tx_state_t tx;
+
+ uint64_t (*time)(void* usr);
+ void* time_usr;
+};
+
+static zcm_trans_packetized_serial_t* cast(zcm_trans_t* zt);
+
+static int grow_buffers(zcm_trans_packetized_serial_t* zt, size_t new_size)
+{
+ uint8_t* rd = realloc(zt->rx.data, new_size);
+ uint8_t* rpr = realloc(zt->rx.packet_received, new_size * sizeof(uint8_t));
+ uint16_t* rmi = realloc(zt->rx.missing_ids, new_size * sizeof(uint16_t));
+ uint8_t* td = realloc(zt->tx.data, new_size);
+ uint16_t* tri = realloc(zt->tx.retrans_ids, new_size * sizeof(uint16_t));
+
+ if (rd) zt->rx.data = rd;
+ if (rpr) zt->rx.packet_received = rpr;
+ if (rmi) zt->rx.missing_ids = rmi;
+ if (td) zt->tx.data = td;
+ if (tri) zt->tx.retrans_ids = tri;
+
+ if (!rd || !rpr || !rmi || !td || !tri) return ZCM_EAGAIN;
+
+ size_t new_out = zt->inner_mtu > new_size ? zt->inner_mtu : new_size;
+ if (new_out > zt->out_buf_size) {
+ uint8_t* ob = realloc(zt->out_buf, new_out);
+ if (!ob) return ZCM_EAGAIN;
+ zt->out_buf = ob;
+ zt->out_buf_size = new_out;
+ }
+
+ zt->rx.packet_capacity = new_size;
+ zt->tx.packet_capacity = new_size;
+ zt->max_message_size = new_size;
+ return ZCM_EOK;
+}
+
+static int send_inner_with_retry(zcm_trans_packetized_serial_t* zt, zcm_msg_t msg)
+{
+ int i;
+ int ret = zcm_trans_sendmsg(zt->inner, msg);
+ if (ret == ZCM_EOK) return ZCM_EOK;
+ if (ret != ZCM_EAGAIN) return ret;
+
+ for (i = 0; i < PACKETIZED_SEND_RETRY_COUNT; ++i) {
+ zcm_trans_update(zt->inner);
+ ret = zcm_trans_sendmsg(zt->inner, msg);
+ if (ret == ZCM_EOK) return ZCM_EOK;
+ if (ret != ZCM_EAGAIN) return ret;
+ }
+
+ return ZCM_EAGAIN;
+}
+
+static int send_packet(zcm_trans_packetized_serial_t* zt, const char* channel,
+ uint16_t session_id, uint8_t type, const uint8_t* body,
+ uint8_t body_len)
+{
+ if (PACKETIZED_HEADER_BYTES + (size_t)body_len > zt->pkt_buf_size)
+ return ZCM_EINVALID;
+
+ zt->pkt_buf[PACKETIZED_HEADER_TYPE_OFFSET] = type;
+ __int16_t_encode_array(zt->pkt_buf, PACKETIZED_HEADER_SESSION_ID_OFFSET,
+ PACKETIZED_HEADER_SESSION_ID_BYTES, (int16_t*)(&session_id), 1);
+ zt->pkt_buf[PACKETIZED_HEADER_BODY_LEN_OFFSET] = body_len;
+ if (body_len > 0 && body != &zt->pkt_buf[PACKETIZED_HEADER_BYTES]) {
+ memcpy(&zt->pkt_buf[PACKETIZED_HEADER_BYTES], body, body_len);
+ }
+
+ zcm_msg_t out;
+ out.utime = 0;
+ out.channel = channel;
+ out.len = PACKETIZED_HEADER_BYTES + body_len;
+ out.buf = zt->pkt_buf;
+ return send_inner_with_retry(zt, out);
+}
+
+static void rx_clear(packetized_rx_state_t* rx)
+{
+ /* Resets all session state to idle. Allocated buffers (data, packet_received,
+ * missing_ids) are kept intact and reused for the next session. */
+ if (rx->packet_received && rx->total_packets > 0)
+ memset(rx->packet_received, 0, rx->total_packets);
+ rx->active = 0;
+ rx->channel[0] = '\0';
+ rx->session_id = 0;
+ rx->total_packets = 0;
+ rx->packet_data_size = 0;
+ rx->total_message_size = 0;
+ rx->expected_checksum = 0;
+ rx->last_update_utime = 0;
+ rx->received_count = 0;
+ rx->retrans_pending = 0;
+ rx->missing_count = 0;
+}
+
+static void tx_clear(packetized_tx_state_t* tx)
+{
+ /* Resets all session state to idle. Allocated buffers (data, retrans_ids)
+ * are kept intact and reused for the next session. */
+ tx->active = 0;
+ tx->channel[0] = '\0';
+ tx->session_id = 0;
+ tx->total_packets = 0;
+ tx->packet_data_size = 0;
+ tx->total_message_size = 0;
+ tx->retrans_count = 0;
+}
+
+static int is_packetized_channel(const char* channel)
+{
+ return channel && channel[0] == PACKETIZED_CHANNEL_PREFIX;
+}
+
+static int send_retrans_request(zcm_trans_packetized_serial_t* zt)
+{
+ size_t sent;
+ packetized_rx_state_t* rx = &zt->rx;
+ if (!rx->retrans_pending || !rx->active || rx->missing_count == 0) return ZCM_EOK;
+
+ size_t max_ids_per_req = (zt->inner_mtu > PACKETIZED_HEADER_BYTES +
+ PACKETIZED_RETRANS_IDS_OFFSET)
+ ? (zt->inner_mtu - PACKETIZED_HEADER_BYTES -
+ PACKETIZED_RETRANS_IDS_OFFSET) /
+ PACKETIZED_RETRANS_ID_BYTES
+ : 0;
+ if (max_ids_per_req > PACKETIZED_RETRANS_MAX_IDS)
+ max_ids_per_req = PACKETIZED_RETRANS_MAX_IDS;
+ if (max_ids_per_req == 0) return ZCM_EINVALID;
+
+ sent = 0;
+ while (sent < rx->missing_count) {
+ size_t i;
+ size_t remaining = rx->missing_count - sent;
+ size_t count = remaining < max_ids_per_req ? remaining : max_ids_per_req;
+ size_t body_len = PACKETIZED_RETRANS_BODY_BYTES(count);
+ uint8_t* body = zt->pkt_buf + PACKETIZED_HEADER_BYTES;
+ body[PACKETIZED_RETRANS_COUNT_OFFSET] = (uint8_t)count;
+ for (i = 0; i < count; ++i) {
+ __int16_t_encode_array(body, PACKETIZED_RETRANS_IDS_OFFSET +
+ i * PACKETIZED_RETRANS_ID_BYTES,
+ PACKETIZED_RETRANS_ID_BYTES,
+ (int16_t*)(&rx->missing_ids[sent + i]), 1);
+ }
+
+ int ret = send_packet(zt, rx->channel, rx->session_id, PACKETIZED_MSG_RETRANS_REQ,
+ body, (uint8_t)body_len);
+ if (ret != ZCM_EOK) return ret;
+ sent += count;
+ }
+
+ rx->retrans_pending = 0;
+ return ZCM_EOK;
+}
+
+static int queue_output(zcm_trans_packetized_serial_t* zt, const char* channel,
+ const uint8_t* data, size_t len, uint64_t utime,
+ int strip_packetized_prefix)
+{
+ if (zt->out_buf_size < len) {
+ return ZCM_EINVALID;
+ }
+
+ if (len > 0) memcpy(zt->out_buf, data, len);
+
+ if (strip_packetized_prefix && channel[0] == PACKETIZED_CHANNEL_PREFIX) channel += 1;
+ strncpy(zt->out_channel, channel, ZCM_CHANNEL_MAXLEN);
+ zt->out_channel[ZCM_CHANNEL_MAXLEN] = '\0';
+ zt->out_len = len;
+ zt->out_utime = utime;
+ zt->out_pending = 1;
+ return ZCM_EOK;
+}
+
+static void deliver_pending(zcm_trans_packetized_serial_t* zt, zcm_msg_t* msg)
+{
+ msg->utime = zt->out_utime;
+ msg->channel = zt->out_channel;
+ msg->len = zt->out_len;
+ msg->buf = zt->out_buf;
+ zt->out_pending = 0;
+}
+
+static size_t packetized_max_mtu(uint8_t packet_data_size)
+{
+ return (size_t)packet_data_size * PACKETIZED_MAX_PACKETS;
+}
+
+static int send_pending_retransmissions(zcm_trans_packetized_serial_t* zt)
+{
+ size_t i;
+ packetized_tx_state_t* tx = &zt->tx;
+ if (!tx->active || tx->retrans_count == 0) return ZCM_EOK;
+
+ size_t chunk = tx->packet_data_size;
+ uint8_t* body = zt->pkt_buf + PACKETIZED_HEADER_BYTES;
+
+ for (i = 0; i < tx->retrans_count; ++i) {
+ uint16_t packet_id = tx->retrans_ids[i];
+ if (packet_id >= tx->total_packets) continue;
+
+ uint32_t offset = (uint32_t)packet_id * (uint32_t)chunk;
+ uint32_t remaining = tx->total_message_size - offset;
+ uint8_t payload_len = (uint8_t)(remaining < chunk ? remaining : chunk);
+
+ __int16_t_encode_array(body, PACKETIZED_DATA_PACKET_ID_OFFSET,
+ PACKETIZED_DATA_PACKET_ID_BYTES, (int16_t*)(&packet_id), 1);
+ if (payload_len > 0)
+ memcpy(&body[PACKETIZED_DATA_PAYLOAD_OFFSET], tx->data + offset, payload_len);
+
+ int ret = send_packet(zt, tx->channel, tx->session_id, PACKETIZED_MSG_DATA, body,
+ (uint8_t)(PACKETIZED_DATA_OVERHEAD_BYTES + payload_len));
+ if (ret != ZCM_EOK) return ret;
+ }
+
+ tx->retrans_count = 0;
+ return ZCM_EOK;
+}
+
+static int begin_rx_session(zcm_trans_packetized_serial_t* zt, const char* channel,
+ uint16_t session_id, const uint8_t* body, size_t body_len,
+ uint64_t utime)
+{
+ if (body_len != PACKETIZED_METADATA_BODY_BYTES) return ZCM_EINVALID;
+
+ uint16_t total_packets = 0;
+ uint8_t packet_data_size = body[PACKETIZED_METADATA_PACKET_DATA_SIZE_OFFSET];
+ uint32_t total_message_size = 0;
+ uint16_t expected_checksum = 0;
+ if (__int16_t_decode_array(body, PACKETIZED_METADATA_TOTAL_PACKETS_OFFSET,
+ PACKETIZED_METADATA_TOTAL_PACKETS_BYTES,
+ (int16_t*)(&total_packets), 1) < 0)
+ return ZCM_EINVALID;
+ if (__int32_t_decode_array(body, PACKETIZED_METADATA_TOTAL_MESSAGE_SIZE_OFFSET,
+ PACKETIZED_METADATA_TOTAL_MESSAGE_SIZE_BYTES,
+ (int32_t*)(&total_message_size), 1) < 0)
+ return ZCM_EINVALID;
+ if (__int16_t_decode_array(body, PACKETIZED_METADATA_CHECKSUM_OFFSET,
+ PACKETIZED_METADATA_CHECKSUM_BYTES,
+ (int16_t*)(&expected_checksum), 1) < 0)
+ return ZCM_EINVALID;
+
+ if (total_packets == 0 || packet_data_size == 0) return ZCM_EINVALID;
+ if (packet_data_size > PACKETIZED_MAX_PACKET_DATA_SIZE) return ZCM_EINVALID;
+ if (total_message_size > zt->mtu) return ZCM_EINVALID;
+ if (total_message_size > zt->max_message_size) {
+ if (!zt->dynamic) return ZCM_EINVALID;
+ if (grow_buffers(zt, total_message_size) != ZCM_EOK) return ZCM_EAGAIN;
+ }
+
+ size_t expected_packets = packetized_message_packet_count(total_message_size,
+ packet_data_size);
+ if (expected_packets != total_packets) return ZCM_EINVALID;
+
+ rx_clear(&zt->rx);
+ packetized_rx_state_t* rx = &zt->rx;
+ if ((size_t)total_packets > rx->packet_capacity) {
+ if (!zt->dynamic) return ZCM_EINVALID;
+ if (grow_buffers(zt, total_message_size) != ZCM_EOK) return ZCM_EAGAIN;
+ }
+
+ strncpy(rx->channel, channel, ZCM_CHANNEL_MAXLEN);
+ rx->channel[ZCM_CHANNEL_MAXLEN] = '\0';
+ rx->active = 1;
+ rx->session_id = session_id;
+ rx->total_packets = total_packets;
+ rx->packet_data_size = packet_data_size;
+ rx->total_message_size = total_message_size;
+ rx->expected_checksum = expected_checksum;
+ rx->last_update_utime = utime;
+ return ZCM_EOK;
+}
+
+static int process_rx_data(zcm_trans_packetized_serial_t* zt, uint16_t session_id,
+ const uint8_t* body, size_t body_len, uint64_t utime)
+{
+ packetized_rx_state_t* rx = &zt->rx;
+ if (!rx->active || rx->session_id != session_id) return ZCM_EOK;
+ if (body_len < PACKETIZED_DATA_OVERHEAD_BYTES) return ZCM_EINVALID;
+
+ uint16_t packet_id = 0;
+ __int16_t_decode_array(body, PACKETIZED_DATA_PACKET_ID_OFFSET,
+ PACKETIZED_DATA_PACKET_ID_BYTES, (int16_t*)(&packet_id), 1);
+ uint8_t payload_len = (uint8_t)(body_len - PACKETIZED_DATA_OVERHEAD_BYTES);
+
+ if (packet_id >= rx->total_packets) return ZCM_EINVALID;
+
+ uint32_t offset = (uint32_t)packet_id * (uint32_t)rx->packet_data_size;
+ if (offset > rx->total_message_size) return ZCM_EINVALID;
+
+ uint32_t remaining = rx->total_message_size - offset;
+ uint8_t expected_len =
+ (uint8_t)(remaining < rx->packet_data_size ? remaining : rx->packet_data_size);
+ if (payload_len != expected_len) return ZCM_EINVALID;
+
+ if (!rx->packet_received[packet_id]) {
+ if (payload_len > 0)
+ memcpy(rx->data + offset, &body[PACKETIZED_DATA_PAYLOAD_OFFSET], payload_len);
+ rx->packet_received[packet_id] = 1;
+ ++rx->received_count;
+ }
+
+ rx->last_update_utime = utime;
+
+ if (rx->received_count == rx->total_packets) {
+ uint16_t checksum =
+ fletcher16(rx->data, rx->total_message_size, PACKETIZED_FLETCHER16_INIT);
+ if (checksum != rx->expected_checksum) {
+ rx_clear(rx);
+ return ZCM_EINVALID;
+ }
+
+ int ret = queue_output(zt, rx->channel, rx->data, rx->total_message_size, utime, 1);
+ if (ret != ZCM_EOK) {
+ rx_clear(rx);
+ return ret;
+ }
+
+ rx_clear(rx);
+ }
+
+ return ZCM_EOK;
+}
+
+static int process_retrans_request(zcm_trans_packetized_serial_t* zt, uint16_t session_id,
+ const uint8_t* body, size_t body_len)
+{
+ uint8_t i;
+ packetized_tx_state_t* tx = &zt->tx;
+ if (!tx->active || tx->session_id != session_id) return ZCM_EOK;
+ if (body_len < PACKETIZED_RETRANS_IDS_OFFSET) return ZCM_EINVALID;
+
+ uint8_t count = body[PACKETIZED_RETRANS_COUNT_OFFSET];
+ if (PACKETIZED_RETRANS_BODY_BYTES(count) != body_len) return ZCM_EINVALID;
+
+ tx->retrans_count = 0;
+ if (count == 0) return ZCM_EOK;
+
+ if ((size_t)count > tx->packet_capacity) return ZCM_EINVALID;
+ tx->retrans_count = count;
+ for (i = 0; i < count; ++i) {
+ uint16_t packet_id = 0;
+ __int16_t_decode_array(body, PACKETIZED_RETRANS_IDS_OFFSET +
+ (size_t)i * PACKETIZED_RETRANS_ID_BYTES,
+ PACKETIZED_RETRANS_ID_BYTES, (int16_t*)(&packet_id), 1);
+ tx->retrans_ids[i] = packet_id;
+ }
+
+ return ZCM_EOK;
+}
+
+static void maybe_schedule_retrans_request(zcm_trans_packetized_serial_t* zt,
+ uint64_t now)
+{
+ uint16_t i;
+ packetized_rx_state_t* rx = &zt->rx;
+ if (!rx->active || rx->retrans_pending) return;
+ if (rx->received_count == rx->total_packets) return;
+ if (now <= rx->last_update_utime) return;
+ if (now - rx->last_update_utime < PACKETIZED_RETRANS_TIMEOUT_US) return;
+
+ uint16_t missing_count = (uint16_t)(rx->total_packets - rx->received_count);
+ if (missing_count == 0) return;
+
+ uint16_t idx = 0;
+ for (i = 0; i < rx->total_packets; ++i) {
+ if (!rx->packet_received[i]) rx->missing_ids[idx++] = i;
+ }
+ rx->missing_count = idx;
+ rx->retrans_pending = idx > 0;
+}
+
+static size_t packetized_serial_get_mtu(zcm_trans_packetized_serial_t* zt) { return zt->mtu; }
+
+static int packetized_serial_sendmsg(zcm_trans_packetized_serial_t* zt, zcm_msg_t msg)
+{
+ uint16_t packet_id;
+ size_t chan_len = strlen(msg.channel);
+ if (chan_len > ZCM_CHANNEL_MAXLEN) return ZCM_EINVALID;
+
+ int ret = send_pending_retransmissions(zt);
+ if (ret != ZCM_EOK) return ret;
+ ret = send_retrans_request(zt);
+ if (ret != ZCM_EOK) return ret;
+
+ if (!is_packetized_channel(msg.channel)) {
+ if (msg.len > zt->inner_mtu) return ZCM_EINVALID;
+ return send_inner_with_retry(zt, msg);
+ }
+
+ uint8_t tx_packet_data_size = zt->configured_packet_data_size;
+ if (tx_packet_data_size == 0 || tx_packet_data_size > PACKETIZED_MAX_PACKET_DATA_SIZE)
+ return ZCM_EINVALID;
+ if (msg.len > zt->mtu) return ZCM_EINVALID;
+ if (msg.len > zt->max_message_size) {
+ if (!zt->dynamic) return ZCM_EINVALID;
+ if (grow_buffers(zt, msg.len) != ZCM_EOK) return ZCM_EAGAIN;
+ }
+
+ uint32_t total_message_size = (uint32_t)msg.len;
+ size_t total_packets_sz = packetized_message_packet_count(total_message_size,
+ tx_packet_data_size);
+ if (total_packets_sz == 0) total_packets_sz = 1;
+ if (total_packets_sz > PACKETIZED_MAX_PACKETS) return ZCM_EINVALID;
+ uint16_t total_packets = (uint16_t)total_packets_sz;
+
+ tx_clear(&zt->tx);
+ packetized_tx_state_t* tx = &zt->tx;
+ if (msg.len > 0) memcpy(tx->data, msg.buf, msg.len);
+
+ tx->active = 1;
+ tx->session_id = ++zt->next_session_id;
+ tx->packet_data_size = tx_packet_data_size;
+ tx->total_packets = total_packets;
+ tx->total_message_size = total_message_size;
+ strncpy(tx->channel, msg.channel, ZCM_CHANNEL_MAXLEN);
+ tx->channel[ZCM_CHANNEL_MAXLEN] = '\0';
+
+ uint8_t* meta = zt->pkt_buf + PACKETIZED_HEADER_BYTES;
+ uint16_t fletcherCsum = fletcher16(msg.buf, msg.len, PACKETIZED_FLETCHER16_INIT);
+ __int16_t_encode_array(meta, PACKETIZED_METADATA_TOTAL_PACKETS_OFFSET,
+ PACKETIZED_METADATA_TOTAL_PACKETS_BYTES,
+ (int16_t*)(&total_packets), 1);
+ meta[PACKETIZED_METADATA_PACKET_DATA_SIZE_OFFSET] = tx_packet_data_size;
+ __int32_t_encode_array(meta, PACKETIZED_METADATA_TOTAL_MESSAGE_SIZE_OFFSET,
+ PACKETIZED_METADATA_TOTAL_MESSAGE_SIZE_BYTES,
+ (int32_t*)(&total_message_size), 1);
+ __int16_t_encode_array(meta, PACKETIZED_METADATA_CHECKSUM_OFFSET,
+ PACKETIZED_METADATA_CHECKSUM_BYTES, (int16_t*)(&fletcherCsum), 1);
+ ret = send_packet(zt, tx->channel, tx->session_id, PACKETIZED_MSG_METADATA, meta,
+ PACKETIZED_METADATA_BODY_BYTES);
+ if (ret != ZCM_EOK) return ret;
+
+ uint8_t* body = zt->pkt_buf + PACKETIZED_HEADER_BYTES;
+ for (packet_id = 0; packet_id < total_packets; ++packet_id) {
+ uint32_t offset = (uint32_t)packet_id * (uint32_t)tx_packet_data_size;
+ uint32_t remaining = total_message_size - offset;
+ uint8_t payload_len =
+ (uint8_t)(remaining < tx_packet_data_size ? remaining : tx_packet_data_size);
+
+ __int16_t_encode_array(body, PACKETIZED_DATA_PACKET_ID_OFFSET,
+ PACKETIZED_DATA_PACKET_ID_BYTES, (int16_t*)(&packet_id), 1);
+ if (payload_len > 0)
+ memcpy(&body[PACKETIZED_DATA_PAYLOAD_OFFSET], tx->data + offset, payload_len);
+
+ ret = send_packet(zt, tx->channel, tx->session_id, PACKETIZED_MSG_DATA, body,
+ (uint8_t)(PACKETIZED_DATA_OVERHEAD_BYTES + payload_len));
+ if (ret != ZCM_EOK) return ret;
+ }
+
+ return ZCM_EOK;
+}
+
+static int packetized_serial_recvmsg_enable(zcm_trans_packetized_serial_t* zt,
+ const char* channel, bool enable)
+{
+ return zcm_trans_recvmsg_enable(zt->inner, channel, enable);
+}
+
+static int process_incoming_messages(zcm_trans_packetized_serial_t* zt)
+{
+ while (!zt->out_pending) {
+ zcm_msg_t in;
+ int ret = zcm_trans_recvmsg(zt->inner, &in, 0);
+ if (ret != ZCM_EOK) return ret;
+
+ if (!is_packetized_channel(in.channel)) {
+ return queue_output(zt, in.channel, in.buf, in.len, in.utime, 0);
+ }
+
+ if (in.len < PACKETIZED_HEADER_BYTES) continue;
+
+ uint8_t type = in.buf[PACKETIZED_HEADER_TYPE_OFFSET];
+ uint16_t session_id = 0;
+ __int16_t_decode_array(in.buf, PACKETIZED_HEADER_SESSION_ID_OFFSET,
+ PACKETIZED_HEADER_SESSION_ID_BYTES, (int16_t*)(&session_id), 1);
+ uint8_t body_len = in.buf[PACKETIZED_HEADER_BODY_LEN_OFFSET];
+ if (in.len != PACKETIZED_HEADER_BYTES + body_len) continue;
+
+ const uint8_t* body = &in.buf[PACKETIZED_HEADER_BYTES];
+ uint64_t now = in.utime == 0 ? zt->time(zt->time_usr) : in.utime;
+
+ if (type == PACKETIZED_MSG_METADATA) {
+ ret = begin_rx_session(zt, in.channel, session_id, body, body_len, now);
+ } else if (type == PACKETIZED_MSG_DATA) {
+ ret = process_rx_data(zt, session_id, body, body_len, now);
+ } else if (type == PACKETIZED_MSG_RETRANS_REQ) {
+ ret = process_retrans_request(zt, session_id, body, body_len);
+ } else {
+ continue;
+ }
+
+ if (ret != ZCM_EOK && ret != ZCM_EINVALID) return ret;
+ }
+
+ return ZCM_EOK;
+}
+
+static int packetized_serial_recvmsg(zcm_trans_packetized_serial_t* zt, zcm_msg_t* msg,
+ unsigned timeoutMs)
+{
+ (void)timeoutMs;
+
+ if (zt->out_pending) {
+ deliver_pending(zt, msg);
+ return ZCM_EOK;
+ }
+
+ while (1) {
+ int ret = process_incoming_messages(zt);
+ if (ret == ZCM_EAGAIN) {
+ maybe_schedule_retrans_request(zt, zt->time(zt->time_usr));
+ ret = send_retrans_request(zt);
+ if (ret != ZCM_EOK) return ret;
+ return ZCM_EAGAIN;
+ }
+ if (ret != ZCM_EOK) return ret;
+
+ deliver_pending(zt, msg);
+ return ZCM_EOK;
+ }
+}
+
+int packetized_serial_update_rx(zcm_trans_t* _zt)
+{
+ zcm_trans_packetized_serial_t* zt = cast(_zt);
+ int ret = serial_update_rx(zt->inner);
+ if (ret != ZCM_EOK) return ret;
+
+ ret = process_incoming_messages(zt);
+ if (ret != ZCM_EOK && ret != ZCM_EAGAIN) return ret;
+
+ maybe_schedule_retrans_request(zt, zt->time(zt->time_usr));
+ return send_retrans_request(zt);
+}
+
+int packetized_serial_update_tx(zcm_trans_t* _zt)
+{
+ zcm_trans_packetized_serial_t* zt = cast(_zt);
+ int ret = send_pending_retransmissions(zt);
+ if (ret != ZCM_EOK) return ret;
+ return serial_update_tx(zt->inner);
+}
+
+static size_t _packetized_serial_get_mtu(zcm_trans_t* zt)
+{
+ return packetized_serial_get_mtu(cast(zt));
+}
+
+static int _packetized_serial_sendmsg(zcm_trans_t* zt, zcm_msg_t msg)
+{
+ return packetized_serial_sendmsg(cast(zt), msg);
+}
+
+static int _packetized_serial_recvmsg_enable(zcm_trans_t* zt, const char* channel,
+ bool enable)
+{
+ return packetized_serial_recvmsg_enable(cast(zt), channel, enable);
+}
+
+static int _packetized_serial_recvmsg(zcm_trans_t* zt, zcm_msg_t* msg, unsigned timeoutMs)
+{
+ return packetized_serial_recvmsg(cast(zt), msg, timeoutMs);
+}
+
+static int _packetized_serial_update(zcm_trans_t* zt)
+{
+ int rxRet = packetized_serial_update_rx(zt);
+ int txRet = packetized_serial_update_tx(zt);
+ return rxRet == ZCM_EOK ? txRet : rxRet;
+}
+
+static zcm_trans_methods_t methods = {
+ &_packetized_serial_get_mtu,
+ &_packetized_serial_sendmsg,
+ &_packetized_serial_recvmsg_enable,
+ &_packetized_serial_recvmsg,
+ NULL,
+ &_packetized_serial_update,
+ &zcm_trans_packetized_serial_destroy,
+};
+
+static zcm_trans_packetized_serial_t* cast(zcm_trans_t* zt)
+{
+ assert(zt->vtbl == &methods);
+ return (zcm_trans_packetized_serial_t*)zt;
+}
+
+zcm_trans_t* zcm_trans_packetized_serial_create(
+ size_t (*get)(uint8_t* data, size_t nData, void* usr),
+ size_t (*put)(const uint8_t* data, size_t nData, void* usr), void* put_get_usr,
+ uint64_t (*timestamp_now)(void* usr), void* time_usr, size_t MTU, size_t bufSize,
+ uint8_t tx_packet_data_size, size_t max_message_size)
+{
+ zcm_trans_packetized_serial_t* zt = calloc(1, sizeof(*zt));
+ if (zt == NULL) return NULL;
+
+ zt->inner = zcm_trans_generic_serial_create(get, put, put_get_usr, timestamp_now,
+ time_usr, MTU, bufSize);
+ if (zt->inner == NULL) {
+ free(zt);
+ return NULL;
+ }
+
+ zt->inner_mtu = zcm_trans_get_mtu(zt->inner);
+ if (zt->inner_mtu < PACKETIZED_HEADER_BYTES + PACKETIZED_DATA_OVERHEAD_BYTES +
+ PACKETIZED_MIN_DATA_PAYLOAD_BYTES ||
+ zt->inner_mtu < PACKETIZED_HEADER_BYTES + PACKETIZED_METADATA_BODY_BYTES) {
+ zcm_trans_generic_serial_destroy(zt->inner);
+ free(zt);
+ return NULL;
+ }
+
+ zt->dynamic = (max_message_size == ZCM_TRANS_PACKETIZED_SERIAL_GROW_DYNAMICALLY);
+ zt->max_message_size = zt->dynamic ? PACKETIZED_DEFAULT_MAX_MESSAGE_SIZE : max_message_size;
+
+ if (tx_packet_data_size == 0) {
+ size_t max_payload =
+ zt->inner_mtu - PACKETIZED_HEADER_BYTES - PACKETIZED_DATA_OVERHEAD_BYTES;
+ if (max_payload > PACKETIZED_MAX_PACKET_DATA_SIZE)
+ max_payload = PACKETIZED_MAX_PACKET_DATA_SIZE;
+ zt->configured_packet_data_size = (uint8_t)max_payload;
+ } else {
+ zt->configured_packet_data_size = tx_packet_data_size;
+ }
+
+ if (zt->configured_packet_data_size == 0 ||
+ zt->configured_packet_data_size > PACKETIZED_MAX_PACKET_DATA_SIZE) {
+ zcm_trans_generic_serial_destroy(zt->inner);
+ free(zt);
+ return NULL;
+ }
+
+ if (PACKETIZED_HEADER_BYTES + PACKETIZED_DATA_OVERHEAD_BYTES +
+ zt->configured_packet_data_size >
+ zt->inner_mtu) {
+ zcm_trans_generic_serial_destroy(zt->inner);
+ free(zt);
+ return NULL;
+ }
+
+ zt->mtu = packetized_max_mtu(zt->configured_packet_data_size);
+ if (!zt->dynamic && zt->mtu > zt->max_message_size)
+ zt->mtu = zt->max_message_size;
+
+ zt->pkt_buf_size = zt->inner_mtu;
+ zt->pkt_buf = malloc(zt->pkt_buf_size);
+ if (zt->pkt_buf == NULL) {
+ zcm_trans_generic_serial_destroy(zt->inner);
+ free(zt);
+ return NULL;
+ }
+
+ zt->out_buf_size = zt->inner_mtu > zt->max_message_size ? zt->inner_mtu : zt->max_message_size;
+ zt->out_buf = malloc(zt->out_buf_size == 0 ? 1 : zt->out_buf_size);
+ if (zt->out_buf == NULL) {
+ free(zt->pkt_buf);
+ zcm_trans_generic_serial_destroy(zt->inner);
+ free(zt);
+ return NULL;
+ }
+ zt->out_len = 0;
+ zt->out_utime = 0;
+ zt->out_channel[0] = '\0';
+ zt->out_pending = 0;
+
+ zt->rx.packet_capacity = zt->max_message_size;
+ zt->rx.data = malloc(zt->max_message_size == 0 ? 1 : zt->max_message_size);
+ zt->rx.packet_received = calloc(zt->rx.packet_capacity, sizeof(uint8_t));
+ zt->rx.missing_ids = malloc(zt->rx.packet_capacity * sizeof(uint16_t));
+ zt->tx.data = malloc(zt->max_message_size == 0 ? 1 : zt->max_message_size);
+ zt->tx.packet_capacity = zt->rx.packet_capacity;
+ zt->tx.retrans_ids = malloc(zt->tx.packet_capacity * sizeof(uint16_t));
+ if (zt->rx.data == NULL || zt->rx.packet_received == NULL || zt->rx.missing_ids == NULL ||
+ zt->tx.data == NULL || zt->tx.retrans_ids == NULL) {
+ free(zt->tx.retrans_ids);
+ free(zt->tx.data);
+ free(zt->rx.missing_ids);
+ free(zt->rx.packet_received);
+ free(zt->rx.data);
+ free(zt->out_buf);
+ free(zt->pkt_buf);
+ zcm_trans_generic_serial_destroy(zt->inner);
+ free(zt);
+ return NULL;
+ }
+ rx_clear(&zt->rx);
+ tx_clear(&zt->tx);
+
+ zt->time = timestamp_now;
+ zt->time_usr = time_usr;
+
+ zt->trans.trans_type = ZCM_NONBLOCKING;
+ zt->trans.vtbl = &methods;
+ return (zcm_trans_t*)zt;
+}
+
+void zcm_trans_packetized_serial_destroy(zcm_trans_t* _zt)
+{
+ zcm_trans_packetized_serial_t* zt = cast(_zt);
+ if (zt->inner) zcm_trans_generic_serial_destroy(zt->inner);
+ free(zt->pkt_buf);
+ free(zt->out_buf);
+ free(zt->rx.missing_ids);
+ free(zt->rx.packet_received);
+ free(zt->rx.data);
+ free(zt->tx.retrans_ids);
+ free(zt->tx.data);
+ free(zt);
+}
diff --git a/zcm/transport/packetized_serial_transport.h b/zcm/transport/packetized_serial_transport.h
new file mode 100644
index 00000000..baff3905
--- /dev/null
+++ b/zcm/transport/packetized_serial_transport.h
@@ -0,0 +1,42 @@
+#ifndef _ZCM_TRANS_PACKETIZED_SERIAL_H
+#define _ZCM_TRANS_PACKETIZED_SERIAL_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include
+
+#include "zcm/transport.h"
+#include "zcm/zcm.h"
+#include "packetized_serial_protocol.h"
+
+/*
+ * Channel name convention
+ * -----------------------
+ * The packetized transport uses the leading '$' character as a signal that a
+ * message should be fragmented and reassembled using the packetization protocol.
+ * Any channel whose name begins with '$' is treated as a packetized channel;
+ * the '$' is stripped from the channel name on delivery to the receiver.
+ * Channels without a leading '$' are passed through to the underlying generic
+ * serial transport unchanged and are not packetized.
+ */
+
+#define ZCM_TRANS_PACKETIZED_SERIAL_GROW_DYNAMICALLY 0
+
+zcm_trans_t* zcm_trans_packetized_serial_create(
+ size_t (*get)(uint8_t* data, size_t nData, void* usr),
+ size_t (*put)(const uint8_t* data, size_t nData, void* usr), void* put_get_usr,
+ uint64_t (*timestamp_now)(void* usr), void* time_usr, size_t MTU, size_t bufSize,
+ uint8_t tx_packet_data_size, size_t max_message_size);
+
+void zcm_trans_packetized_serial_destroy(zcm_trans_t* zt);
+
+int packetized_serial_update_rx(zcm_trans_t* zt);
+int packetized_serial_update_tx(zcm_trans_t* zt);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _ZCM_TRANS_PACKETIZED_SERIAL_H */
diff --git a/zcm/transport/transport_can.cpp b/zcm/transport/transport_can.cpp
index 84090313..f0c17819 100644
--- a/zcm/transport/transport_can.cpp
+++ b/zcm/transport/transport_can.cpp
@@ -4,6 +4,7 @@
#include "zcm/util/debug.h"
#include "generic_serial_transport.h"
+#include "packetized_serial_transport.h"
#include "util/TimeUtil.hpp"
@@ -33,11 +34,26 @@
using namespace std;
+static bool parsePacketDataSize(const string* opt, uint8_t& out)
+{
+ if (!opt) {
+ out = 0;
+ return true;
+ }
+ char* endptr;
+ unsigned long parsed = strtoul(opt->c_str(), &endptr, 10);
+ if (*endptr != '\0' || parsed == 0 || parsed > 253) return false;
+ out = (uint8_t)parsed;
+ return true;
+}
+
+
struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
{
unordered_map options;
uint32_t msgId;
uint32_t txId;
+ uint8_t packetDataSize;
string address;
int soc = -1;
@@ -46,6 +62,8 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
struct ifreq ifr;
zcm_trans_t* gst = nullptr;
+ int (*gst_update_rx)(zcm_trans_t*) = nullptr;
+ int (*gst_update_tx)(zcm_trans_t*) = nullptr;
uint64_t recvTimeoutUs = 0;
uint64_t recvMsgStartUtime = 0;
@@ -72,6 +90,12 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
options[opts->name[i]] = opts->value[i];
msgId = 0;
+ packetDataSize = 0;
+ if (!parsePacketDataSize(findOption("pkt_size"), packetDataSize)) {
+ ZCM_DEBUG("Invalid pkt_size. Expected integer in [1,253]");
+ return;
+ }
+
auto* msgIdStr = findOption("msgid");
if (!msgIdStr) {
ZCM_DEBUG("Msg Id unspecified");
@@ -155,18 +179,34 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
return;
}
- gst = zcm_trans_generic_serial_create(&ZCM_TRANS_CLASSNAME::get,
- &ZCM_TRANS_CLASSNAME::put,
- this,
- &ZCM_TRANS_CLASSNAME::timestamp_now,
- this,
- MTU, MTU * 10);
+ if (packetDataSize != 0) {
+ gst = zcm_trans_packetized_serial_create(&ZCM_TRANS_CLASSNAME::get,
+ &ZCM_TRANS_CLASSNAME::put,
+ this,
+ &ZCM_TRANS_CLASSNAME::timestamp_now,
+ this,
+ MTU, MTU * 10,
+ packetDataSize,
+ ZCM_TRANS_PACKETIZED_SERIAL_GROW_DYNAMICALLY);
+ gst_update_rx = packetized_serial_update_rx;
+ gst_update_tx = packetized_serial_update_tx;
+ } else {
+ gst = zcm_trans_generic_serial_create(&ZCM_TRANS_CLASSNAME::get,
+ &ZCM_TRANS_CLASSNAME::put,
+ this,
+ &ZCM_TRANS_CLASSNAME::timestamp_now,
+ this,
+ MTU, MTU * 10);
+ gst_update_rx = serial_update_rx;
+ gst_update_tx = serial_update_tx;
+ }
+ if (!gst) return;
socSettingsGood = true;
}
~ZCM_TRANS_CLASSNAME()
{
- if (gst) zcm_trans_generic_serial_destroy(gst);
+ if (gst) zcm_trans_destroy(gst);
if (soc != -1 && close(soc) < 0) {
ZCM_DEBUG("Failed to close");
}
@@ -277,7 +317,7 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
{
int ret = zcm_trans_sendmsg(this->gst, msg);
if (ret != ZCM_EOK) return ret;
- return serial_update_tx(this->gst);
+ return this->gst_update_tx(this->gst);
}
int recvmsgEnable(const char* channel, bool enable)
@@ -309,7 +349,7 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
return ZCM_EUNKNOWN;
}
- serial_update_rx(this->gst);
+ this->gst_update_rx(this->gst);
} while (true);
return ZCM_EAGAIN;
}
@@ -366,5 +406,6 @@ static zcm_trans_t *create(zcm_url_t* url, char **opt_errmsg)
#ifdef USING_TRANS_CAN
const TransportRegister ZCM_TRANS_CLASSNAME::reg(
"can", "Transfer data via a socket CAN connection on a single id "
- "(e.g. 'can://can0?msgid=65536&rx_extended_addr=standard&tx_extended_addr=true')", create);
+ "(e.g. 'can://can0?msgid=65536&rx_extended_addr=standard&tx_extended_addr=true' or "
+ "'can://can0?msgid=65536&pkt_size=8')", create);
#endif
diff --git a/zcm/transport/transport_serial.cpp b/zcm/transport/transport_serial.cpp
index b0cb8e06..86d375ed 100644
--- a/zcm/transport/transport_serial.cpp
+++ b/zcm/transport/transport_serial.cpp
@@ -5,6 +5,7 @@
#include "zcm/util/debug.h"
#include "generic_serial_transport.h"
+#include "packetized_serial_transport.h"
#include "util/TimeUtil.hpp"
@@ -43,6 +44,20 @@ using u16 = uint16_t;
using u32 = uint32_t;
using u64 = uint64_t;
+static bool parsePacketDataSize(const string* opt, uint8_t& out)
+{
+ if (!opt) {
+ out = 0;
+ return true;
+ }
+ char* endptr;
+ unsigned long parsed = strtoul(opt->c_str(), &endptr, 10);
+ if (*endptr != '\0' || parsed == 0 || parsed > 253) return false;
+ out = (uint8_t)parsed;
+ return true;
+}
+
+
struct Serial
{
Serial(){}
@@ -242,7 +257,7 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
int baud;
bool hwFlowControl;
-
+ uint8_t packetDataSize;
bool raw;
string rawChan;
int rawSize;
@@ -253,6 +268,8 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
unordered_map options;
zcm_trans_t* gst;
+ int (*gst_update_rx)(zcm_trans_t*) = nullptr;
+ int (*gst_update_tx)(zcm_trans_t*) = nullptr;
uint64_t timeoutLeftUs;
@@ -298,6 +315,11 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
}
}
+ packetDataSize = 0;
+ if (!parsePacketDataSize(findOption("pkt_size"), packetDataSize)) {
+ ZCM_DEBUG("expected integer argument in [1,253] for 'pkt_size'");
+ return;
+ }
raw = false;
auto* rawStr = findOption("raw");
if (rawStr) {
@@ -310,6 +332,10 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
return;
}
}
+ if (raw && packetDataSize != 0) {
+ ZCM_DEBUG("'raw' and 'pkt_size' options are mutually exclusive");
+ return;
+ }
rawChan = "";
auto* rawChanStr = findOption("raw_channel");
@@ -333,6 +359,17 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
if (raw) {
rawBuf.reset(new uint8_t[rawSize]);
gst = nullptr;
+ } else if (packetDataSize != 0) {
+ gst = zcm_trans_packetized_serial_create(&ZCM_TRANS_CLASSNAME::get,
+ &ZCM_TRANS_CLASSNAME::put,
+ this,
+ &ZCM_TRANS_CLASSNAME::timestamp_now,
+ nullptr,
+ MTU, MTU * 10,
+ packetDataSize,
+ ZCM_TRANS_PACKETIZED_SERIAL_GROW_DYNAMICALLY);
+ gst_update_rx = packetized_serial_update_rx;
+ gst_update_tx = packetized_serial_update_tx;
} else {
gst = zcm_trans_generic_serial_create(&ZCM_TRANS_CLASSNAME::get,
&ZCM_TRANS_CLASSNAME::put,
@@ -340,18 +377,20 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
&ZCM_TRANS_CLASSNAME::timestamp_now,
nullptr,
MTU, MTU * 10);
+ gst_update_rx = serial_update_rx;
+ gst_update_tx = serial_update_tx;
}
}
~ZCM_TRANS_CLASSNAME()
{
ser.close();
- if (gst) zcm_trans_generic_serial_destroy(gst);
+ if (gst) zcm_trans_destroy(gst);
}
bool good()
{
- return ser.isOpen();
+ return ser.isOpen() && (raw || gst != nullptr);
}
static size_t get(uint8_t* data, size_t nData, void* usr)
@@ -389,7 +428,7 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
// and touch no variables related to receiving
int ret = zcm_trans_sendmsg(this->gst, msg);
if (ret != ZCM_EOK) return ret;
- return serial_update_tx(this->gst);
+ return this->gst_update_tx(this->gst);
}
}
@@ -427,7 +466,7 @@ struct ZCM_TRANS_CLASSNAME : public zcm_trans_t
// `get` knows how long it has to exit
timeoutLeftUs = timeoutLeftUs > diff ? timeoutLeftUs - diff : 0;
- serial_update_rx(this->gst);
+ this->gst_update_rx(this->gst);
diff = TimeUtil::utime() - startUtime;
timeoutLeftUs = timeoutLeftUs > diff ? timeoutLeftUs - diff : 0;
@@ -489,7 +528,8 @@ static zcm_trans_t* create(zcm_url_t* url, char **opt_errmsg)
// Register this transport with ZCM
const TransportRegister ZCM_TRANS_CLASSNAME::reg(
"serial", "Transfer data via a serial connection "
- "(e.g. 'serial:///dev/ttyUSB0?baud=115200&hw_flow_control=true' or "
- "'serial:///dev/pts/10?raw=true&raw_channel=RAW_SERIAL')",
+ "(e.g. 'serial:///dev/ttyUSB0?baud=115200&hw_flow_control=true', "
+ "'serial:///dev/ttyUSB0?baud=115200&pkt_size=128', or "
+ "'serial:///dev/pts/10?raw=true&raw_channel=RAW_SERIAL')",
create);
#endif
diff --git a/zcm/transport_registrar.c b/zcm/transport_registrar.c
index 7852d295..5bbfc1c8 100644
--- a/zcm/transport_registrar.c
+++ b/zcm/transport_registrar.c
@@ -35,9 +35,44 @@ zcm_trans_create_func *zcm_transport_find(const char *name)
void zcm_transport_help(FILE *f)
{
+ const int name_width = 20;
+ const int desc_width = 120 - name_width - 1;
+
fprintf(f, "Transport Name Description\n");
fprintf(f, "---------------------------------------------------------------------\n");
for (size_t i = 0; i < t_index; i++) {
- fprintf(f, "%-20s %s\n", t_name[i], t_desc[i]);
+ const char *desc = t_desc[i];
+ size_t len = strlen(desc);
+ size_t pos = 0;
+ bool first_line = true;
+
+ do {
+ if (first_line) {
+ fprintf(f, "%-*s ", name_width, t_name[i]);
+ first_line = false;
+ } else {
+ fprintf(f, "%*s", name_width + 1, "");
+ }
+
+ // Check if remaining description will fit
+ size_t remaining = len - pos;
+ if (remaining <= (size_t)desc_width) {
+ fprintf(f, "%s\n", desc + pos);
+ break;
+ }
+
+ // Look for natural boundary to break on
+ size_t brk = (size_t)desc_width;
+ while (brk > 0 && desc[pos + brk] != ' ')
+ brk--;
+ if (brk == 0)
+ brk = (size_t)desc_width;
+
+ fprintf(f, "%.*s\n", (int)brk, desc + pos);
+ pos += brk;
+ while (pos < len && desc[pos] == ' ')
+ pos++;
+ } while (pos < len);
+ fprintf(f, "\n");
}
}
diff --git a/zcm/wscript b/zcm/wscript
index 1fadddde..87a8f2c5 100644
--- a/zcm/wscript
+++ b/zcm/wscript
@@ -48,6 +48,9 @@ def build(ctx):
'zcm_coretypes.h', 'transport.h', 'nonblocking.h', 'nonblocking.c',
'transport/generic_serial_transport.h',
'transport/generic_serial_transport.c',
+ 'transport/packetized_serial_protocol.h',
+ 'transport/packetized_serial_transport.h',
+ 'transport/packetized_serial_transport.c',
'transport/generic_serial_circ_buff.h',
'transport/generic_serial_circ_buff.c',
'transport/generic_serial_fletcher.h']
@@ -103,9 +106,11 @@ def build(ctx):
['json/json.h', 'json/json-forwards.h'])
ctx.install_files('${PREFIX}/include/zcm/transport',
- ['transport/generic_serial_transport.h',
- 'transport/generic_serial_circ_buff.h',
- 'transport/generic_serial_fletcher.h'])
+ ['transport/generic_serial_transport.h',
+ 'transport/packetized_serial_protocol.h',
+ 'transport/packetized_serial_transport.h',
+ 'transport/generic_serial_circ_buff.h',
+ 'transport/generic_serial_fletcher.h'])
ctx.install_files('${PREFIX}/share/embedded', ['zcm-embed.tar.gz'])