diff --git a/examples/local_rdma_ops/atomic_gpu.cpp b/examples/local_rdma_ops/atomic_gpu.cpp index 1ccdeea2..2cf13096 100644 --- a/examples/local_rdma_ops/atomic_gpu.cpp +++ b/examples/local_rdma_ops/atomic_gpu.cpp @@ -72,18 +72,24 @@ __device__ void SendThreadKernel(RdmaEndpoint& epSend, RdmaMemoryRegion sendMr, } __device__ void RecvThreadKernel(RdmaEndpoint& epRecv, RdmaMemoryRegion mr) { - uint32_t postIdx = 0; uint32_t* addr = reinterpret_cast(mr.addr); uint32_t val = core::AtomicLoadSeqCst(addr); printf("val = %u\n", val); - while (val != 2) { + + // Cross-block, lock-free observation: there is no sync between the send block + // (which issues CAS then FETCH_ADD) and this recv block, so by the time we + // start polling either both atomics or only CAS may have landed. Just wait + // until the value leaves its initial 0, and then until it reaches the final + // expected sum (CAS_swap + FETCH_ADD_value == 2 + 2 == 4). + while (val == 0) { val = core::AtomicLoadSeqCst(addr); - printf("after compare and swap val = %u\n", val); } + printf("after compare and swap val = %u\n", val); + while (val != 4) { val = core::AtomicLoadSeqCst(addr); - printf("after fetch add val = %u\n", val); } + printf("after fetch add val = %u\n", val); } __global__ void SendRecvOnGpu(RdmaEndpoint& epSend, RdmaEndpoint& epRecv, RdmaMemoryRegion mrSend, diff --git a/examples/local_rdma_ops/send_recv_gpu.cpp b/examples/local_rdma_ops/send_recv_gpu.cpp index a880fe52..12c80264 100644 --- a/examples/local_rdma_ops/send_recv_gpu.cpp +++ b/examples/local_rdma_ops/send_recv_gpu.cpp @@ -53,9 +53,17 @@ __device__ void SendThreadKernel(RdmaEndpoint& epSend, RdmaMemoryRegion mr, int printf("RingDoorbell is done\n"); __threadfence_system(); - int snd_opcode = - PollCq

(epSend.cqHandle.cqAddr, epSend.cqHandle.cqeNum, &epSend.cqHandle.consIdx); - printf("send PollCq is done\n"); + // PSD 4-arg PollCq is non-blocking (returns -1 when the CCQE msg_msn isn't + // there yet); BNXT/MLX5 spin internally. Wrap in a busy-wait loop so this + // example works uniformly across all providers. + uint32_t snd_wqeIdx = 0; + int snd_opcode; + do { + snd_opcode = PollCq

(epSend.cqHandle.cqAddr, epSend.cqHandle.cqeNum, + &epSend.cqHandle.consIdx, &snd_wqeIdx); + } while (snd_opcode < 0); + epSend.cqHandle.consIdx += 1; + printf("send PollCq is done, wqeIdx %u\n", snd_wqeIdx); UpdateCqDbrRecord

(epSend.cqHandle, epSend.cqHandle.consIdx); printf("send UpdateCqDbrRecord is done\n"); // printf("snd_opcode %d val %d\n", snd_opcode, reinterpret_cast(mrSend.addr)[0]); @@ -84,9 +92,14 @@ __device__ void RecvThreadKernel(RdmaEndpoint& epRecv, RdmaMemoryRegion mr, int printf("recv RingDoorbell is done\n"); } - int rcv_opcode = - PollCq

(epRecv.cqHandle.cqAddr, epRecv.cqHandle.cqeNum, &epRecv.cqHandle.consIdx); - printf("recv PollCq is done\n"); + uint32_t rcv_wqeIdx = 0; + int rcv_opcode; + do { + rcv_opcode = PollCq

(epRecv.cqHandle.cqAddr, epRecv.cqHandle.cqeNum, + &epRecv.cqHandle.consIdx, &rcv_wqeIdx); + } while (rcv_opcode < 0); + epRecv.cqHandle.consIdx += 1; + printf("recv PollCq is done, wqeIdx %u\n", rcv_wqeIdx); UpdateCqDbrRecord

(epRecv.cqHandle, epRecv.cqHandle.consIdx); printf("recv UpdateCqDbrRecord is done\n"); diff --git a/examples/local_rdma_ops/write_inline_gpu.cpp b/examples/local_rdma_ops/write_inline_gpu.cpp index f54cc7a4..bcdd1cbe 100644 --- a/examples/local_rdma_ops/write_inline_gpu.cpp +++ b/examples/local_rdma_ops/write_inline_gpu.cpp @@ -53,11 +53,19 @@ __device__ void SendThreadKernel(RdmaEndpoint& epSend, RdmaMemoryRegion mr) { RingDoorbell

(epSend.wqHandle.dbrAddr, dbr_val); __threadfence_system(); - int opcode = - PollCq

(epSend.cqHandle.cqAddr, epSend.cqHandle.cqeNum, &epSend.cqHandle.consIdx); + // PSD 4-arg PollCq is non-blocking (returns -1 when CQE / CCQE msg_msn not + // ready yet), while BNXT/MLX5 internally spin. Wrap in a busy-wait loop so + // this example works uniformly across all providers. + uint32_t wqeIdx = 0; + int opcode; + do { + opcode = PollCq

(epSend.cqHandle.cqAddr, epSend.cqHandle.cqeNum, &epSend.cqHandle.consIdx, + &wqeIdx); + } while (opcode < 0); + epSend.cqHandle.consIdx += 1; __threadfence_system(); UpdateCqDbrRecord

(epSend.cqHandle, epSend.cqHandle.consIdx); - // printf("round %d snd_opcode %d\n", i, opcode); + // printf("round %d snd_opcode %d wqeIdx %u\n", i, opcode, wqeIdx); raddr += i; } diff --git a/include/mori/application/transport/rdma/providers/dv_loader.hpp b/include/mori/application/transport/rdma/providers/dv_loader.hpp index d008effe..fe91b35e 100644 --- a/include/mori/application/transport/rdma/providers/dv_loader.hpp +++ b/include/mori/application/transport/rdma/providers/dv_loader.hpp @@ -181,6 +181,8 @@ struct IonicDvApi { using pd_set_sqcmb_t = int (*)(struct ibv_pd*, bool, bool, bool); using pd_set_rqcmb_t = int (*)(struct ibv_pd*, bool, bool, bool); using pd_set_udma_mask_t = int (*)(struct ibv_pd*, uint32_t); + using create_cq_ex_t = struct ibv_cq_ex* (*)(struct ibv_context*, struct ibv_cq_init_attr_ex*, + struct ionic_cq_init_attr_ex*); get_ctx_t get_ctx = nullptr; qp_get_udma_idx_t qp_get_udma_idx = nullptr; @@ -189,6 +191,7 @@ struct IonicDvApi { pd_set_sqcmb_t pd_set_sqcmb = nullptr; pd_set_rqcmb_t pd_set_rqcmb = nullptr; pd_set_udma_mask_t pd_set_udma_mask = nullptr; + create_cq_ex_t create_cq_ex = nullptr; void* handle = nullptr; @@ -203,7 +206,9 @@ struct IonicDvApi { pd_set_sqcmb = (pd_set_sqcmb_t)DvLoadSymbol(handle, "ionic_dv_pd_set_sqcmb"); pd_set_rqcmb = (pd_set_rqcmb_t)DvLoadSymbol(handle, "ionic_dv_pd_set_rqcmb"); pd_set_udma_mask = (pd_set_udma_mask_t)DvLoadSymbol(handle, "ionic_dv_pd_set_udma_mask"); + create_cq_ex = (create_cq_ex_t)DvLoadSymbol(handle, "ionic_dv_create_cq_ex"); + // create_cq_ex is optional: nullptr means CCQE not supported by this driver version return get_ctx && qp_get_udma_idx && get_cq && get_qp && pd_set_sqcmb && pd_set_rqcmb && pd_set_udma_mask; } diff --git a/include/mori/core/transport/rdma/providers/ionic/ionic_defs.hpp b/include/mori/core/transport/rdma/providers/ionic/ionic_defs.hpp index 2e79e875..9d3cd9be 100644 --- a/include/mori/core/transport/rdma/providers/ionic/ionic_defs.hpp +++ b/include/mori/core/transport/rdma/providers/ionic/ionic_defs.hpp @@ -26,7 +26,6 @@ namespace core { #define QUEUE_SIZE 1 #define MAX_INLINE_SIZE 32 -// #define IONIC_CCQE 1 -#undef IONIC_CCQE + } // namespace core } // namespace mori diff --git a/include/mori/core/transport/rdma/providers/ionic/ionic_device_primitives.hpp b/include/mori/core/transport/rdma/providers/ionic/ionic_device_primitives.hpp index f00f0ba9..f4397412 100644 --- a/include/mori/core/transport/rdma/providers/ionic/ionic_device_primitives.hpp +++ b/include/mori/core/transport/rdma/providers/ionic/ionic_device_primitives.hpp @@ -466,101 +466,21 @@ inline __device__ void UpdateDbrAndRingDbRecv(void* dbrRecAdd /* ---------------------------------------------------------------------------------------------- */ #ifdef IONIC_CCQE template <> -inline __device__ int PollCqOnce(void* cqeAddr, uint32_t cqeNum, - uint32_t consIdx, uint32_t* wqeIdx) { - volatile struct ionic_v1_cqe* cqe = reinterpret_cast(cqeAddr); - uint32_t old, msn = HTOBE32(cqe->send.msg_msn); - - MORI_PRINTF("ABH %s:%d here cons %#x msn %#x\n", __func__, __LINE__, consIdx, msn); - while ((msn - consIdx) & 0x800000) { - old = msn; - msn = HTOBE32(cqe->send.msg_msn); - if (msn != old) { - MORI_PRINTF("ABH %s:%d here cons %#x msn %#x\n", __func__, __LINE__, consIdx, msn); - } - } - MORI_PRINTF("ABH %s:%d here - msn %#x\n", __func__, __LINE__, msn); - - *wqeIdx = msn; - - return 0; -} -#else -template <> -inline __device__ int PollCqOnce(void* cqeAddr, uint32_t cqeNum, - uint32_t consIdx, uint32_t* wqeIdx) { - uint32_t cqeIdx = consIdx & (cqeNum - 1); - char* Addr = reinterpret_cast(cqeAddr) + (cqeIdx * sizeof(struct ionic_v1_cqe)); - struct ionic_v1_cqe* cqe = reinterpret_cast(Addr); - - MORI_PRINTF("ABH %s:%d consIdx:%u, cqeIdx:%u, cqeAddr:%p, qtf_be:0x%08x, cqe->status_length:%d\n", - __func__, __LINE__, consIdx, cqeIdx, Addr, - *(volatile uint32_t*)(&cqe->qid_type_flags), HTOBE32(cqe->status_length)); -#if 1 - MORI_PRINTF("dump cqe at addr:%p\n", Addr); - for (int i = 0; i < 32; i++) { - MORI_PRINTF("%02x", (unsigned char)Addr[i]); - if ((i + 1) % 4 == 0) MORI_PRINTF("\n"); - } -#endif - /* Determine expected color based on cq wrap count */ - uint32_t qtf_color_bit = HTOBE32(IONIC_V1_CQE_COLOR); - uint32_t qtf_color_exp = qtf_color_bit; - if (cqeIdx & cqeNum) { - qtf_color_exp = 0; - } - - /* Check if my cqe color == expected color */ - // first round: 1 == 1, second round: 0 == 0 - uint32_t qtf_be = *(volatile uint32_t*)(&cqe->qid_type_flags); - if ((qtf_be & qtf_color_bit) != qtf_color_exp) { - MORI_PRINTF("cqe not ready\n"); - return -1; // CQE just not ready yet, try again - } +inline __device__ int PollCq(void* cqAddr, uint32_t cqeNum, uint32_t* consIdx, + uint32_t* wqeCounter) { + const uint32_t curConsIdx = *consIdx; - uint32_t msn = HTOBE32(cqe->send.msg_msn); - - /* Report if the completion indicates an error. */ - if (!!(qtf_be & HTOBE32(IONIC_V1_CQE_ERROR))) { - uint32_t qtf = HTOBE32(qtf_be); - uint32_t qid = qtf >> IONIC_V1_CQE_QID_SHIFT; - uint32_t type = (qtf >> IONIC_V1_CQE_TYPE_SHIFT) & IONIC_V1_CQE_TYPE_MASK; - uint32_t flag = qtf & 0xf; - uint32_t status = cqe->status_length; - uint64_t npg = cqe->send.npg_wqe_idx_timestamp & IONIC_V1_CQE_WQE_IDX_MASK; - MORI_PRINTF("QUIET ERROR: qid %u type %u flag %#x status %u msn %u npg %lu\n", qid, type, flag, - status, msn, npg); - return HTOBE32(cqe->status_length); + volatile struct ionic_v1_cqe* cqe = reinterpret_cast(cqAddr); + const uint32_t msn = BE32TOH(*(volatile uint32_t*)(&cqe->send.msg_msn)); + if ((msn - (curConsIdx + 1)) & 0x800000) { + return -1; // firmware hasn't produced enough completions yet } - MORI_PRINTF("poll cqe one, success\n"); - + *wqeCounter = msn; return 0; } -#endif -template <> -inline __device__ int PollCq(void* cqAddr, uint32_t cqeNum, uint32_t* consIdx) { - const uint32_t curConsIdx = atomicAdd(consIdx, 1); - int err = -1; - - // ABH: polls until each thread sees a ready cqe - // (what if not all threads see a ready cqe?) - do { - err = PollCqOnce(cqAddr, cqeNum, curConsIdx, nullptr); - // TODO: Explain clearly why adding a compiler barrier fix hang issue - asm volatile("" ::: "memory"); - } while (err < 0); - - // Handle error cases - if (err) { - auto error = IonicHandleErrorCqe(err); - MORI_PRINTF("[IONIC PollCq] CQE error: %s (opcode: %d) at %s:%d\n", IbvWcStatusString(error), - err, __FILE__, __LINE__); - return err; - } - return 0; -} +#else template <> inline __device__ int PollCq(void* cqAddr, uint32_t cqeNum, uint32_t* consIdx, @@ -591,216 +511,13 @@ inline __device__ int PollCq(void* cqAddr, uint32_t cqeNum, u const uint32_t msn = BE32TOH(cqe->send.msg_msn) & 0xFFFF; const uint8_t error = IonicHandleErrorCqe(status); - // MORI_PRINTF( - // "PollCqOnce2, QUIET ERROR: block:%u, warp:%u, lane:%u, cqeAddr:%p, error:%u " - // "qid %u type %u flag %#x status 0x%08x msn %u npg %lu\n", - // blockIdx.x, threadIdx.x / warpSize, __lane_id(), cqeAddr, error, qid, type, flags, - // status, msn, npg); - -#if 0 - // Debug: dump raw CQE contents - MORI_PRINTF("dump cqe at addr:%p\n", cqeAddr); - for (int i = 0; i < 32; i++) { - MORI_PRINTF("%02x", static_cast(cqeAddr[i])); - if ((i + 1) % 4 == 0) { - MORI_PRINTF("\n"); - } - } -#endif - return error; } *wqeCounter = BE32TOH(cqe->send.msg_msn); return 0; } - -#ifdef IONIC_CCQE -inline __device__ int PollCqOnce2(WorkQueueHandle& wqHandle, CompletionQueueHandle& cqHandle, - uint64_t activemask, void* cqeAddr, uint32_t cqeNum, - uint32_t consIdx) { - volatile struct ionic_v1_cqe* cqe = reinterpret_cast(cqeAddr); - uint32_t old, msn = HTOBE32(cqe->send.msg_msn); - - consIdx = wqHandle.dbTouchIdx; - - // MORI_PRINTF("ABH %s:%d here cons %#x msn %#x\n", __func__, __LINE__, consIdx, msn); - while ((msn - consIdx) & 0x800000) { - old = msn; - msn = HTOBE32(cqe->send.msg_msn); - if (msn != old) { - // MORI_PRINTF("ABH %s:%d here cons %#x msn %#x\n", __func__, __LINE__, consIdx, msn); - } - } - - wqHandle.doneIdx = msn; - return 0; -} -#else -inline __device__ int PollCqOnce2(WorkQueueHandle& wqHandle, CompletionQueueHandle& cqHandle, - uint64_t activemask, void* cqeAddr, uint32_t cqeNum, - uint32_t consIdx) { - uint32_t my_logical_lane_id = get_active_lane_num(activemask); - uint32_t my_cq_pos = cqHandle.cq_consumer + my_logical_lane_id; - - uint32_t cqeIdx = my_cq_pos & (cqeNum - 1); - char* Addr = reinterpret_cast(cqeAddr) + (cqeIdx * sizeof(struct ionic_v1_cqe)); - - struct ionic_v1_cqe* cqe = reinterpret_cast(Addr); -#if 0 - MORI_PRINTF("PollCqOnce2, block:%u, warp:%u, lane:%u, consIdx:%u, cqeIdx:%u, cqeAddr:%p, qtf_be:0x%08x, cqe->status_length:%d, msn:%u\n", - blockIdx.x, threadIdx.x/warpSize, __lane_id(), my_cq_pos, cqeIdx, Addr, - *(volatile uint32_t *)(&cqe->qid_type_flags), BE32TOH(cqe->status_length), BE32TOH(cqe->send.msg_msn)); -#endif -#if 0 - MORI_PRINTF("dump cqe at addr:%p\n", Addr); - for (int i = 0; i < 32; i++) { - MORI_PRINTF("%02x", (unsigned char)Addr[i]); - if ((i+1)%4 == 0) - MORI_PRINTF("\n"); - } -#endif - /* Determine expected color based on cq wrap count */ - uint32_t qtf_color_bit = IONIC_V1_CQE_COLOR; - uint32_t qtf_color_exp = qtf_color_bit; - if (my_cq_pos & cqeNum) { - qtf_color_exp = 0; - } - - /* Check if my cqe color == expected color */ - // first round: 1 == 1, second round: 0 == 0 - uint32_t qtf_be = BE32TOH(*(volatile uint32_t*)(&cqe->qid_type_flags)); - if ((qtf_be & qtf_color_bit) != qtf_color_exp) { -#if 0 - MORI_PRINTF("PollCqOnce2, not ready, block:%u, warp:%u, lane:%u, consIdx:%u, cqeIdx:%u, cqeAddr:%p, qtf_be:0x%08x, cqe->status_length:0x%08x, msn:%u\n", - blockIdx.x, threadIdx.x/warpSize, __lane_id(), my_cq_pos, cqeIdx, Addr, - *(volatile uint32_t *)(&cqe->qid_type_flags), BE32TOH(cqe->status_length), BE32TOH(cqe->send.msg_msn)); -#endif - return 0; // CQE just not ready yet, try again - } - - uint32_t msn = BE32TOH(cqe->send.msg_msn); - - /* Report if the completion indicates an error. */ - if (!!(qtf_be & IONIC_V1_CQE_ERROR)) { - uint32_t qtf = qtf_be; - uint32_t qid = qtf >> IONIC_V1_CQE_QID_SHIFT; - uint32_t type = (qtf >> IONIC_V1_CQE_TYPE_SHIFT) & IONIC_V1_CQE_TYPE_MASK; - uint32_t flag = qtf & 0xf; - uint32_t status = cqe->status_length; - uint64_t npg = cqe->send.npg_wqe_idx_timestamp & IONIC_V1_CQE_WQE_IDX_MASK; - uint8_t error = IonicHandleErrorCqe(BE32TOH(cqe->status_length)); - MORI_PRINTF( - "PollCqOnce2, QUIET ERROR: block:%u, warp:%u, lane:%u, cqeAddr:%p, error:%u qid %u type %u " - "flag %#x status 0x%08x msn %u npg %lu\n", - blockIdx.x, threadIdx.x / warpSize, __lane_id(), Addr, error, qid, type, flag, status, msn, - npg); -#if 1 - MORI_PRINTF("dump cqe at addr:%p\n", Addr); - for (int i = 0; i < 32; i++) { - MORI_PRINTF("%02x", (unsigned char)Addr[i]); - if ((i + 1) % 4 == 0) MORI_PRINTF("\n"); - } -#endif - /* No other way to signal an error, so just crash. */ - // abort(); - return error; - } - -#if 0 - MORI_PRINTF("PollCqOnce2, success, block:%u, warp:%u, lane:%u, qp:%u, cqeAddr:%p, my_cq_pos:%u, cqeNum:%u, msn:%u\n", - blockIdx.x, threadIdx.x/warpSize, __lane_id(), - qtf_be >> IONIC_V1_CQE_QID_SHIFT, Addr, my_cq_pos, cqHandle.cqeNum, msn); -#endif - /* Only proceed with the furthest ahead cqe to update the sq state */ - uint64_t my_lane_mask = 1ull << __lane_id(); - uint64_t lesser_lane_mask = my_lane_mask - 1; - if (my_lane_mask != (__ballot(true) & activemask & ~lesser_lane_mask)) { - return 0; - } - - /* update position in the cq */ - cqHandle.cq_consumer = my_cq_pos + 1; - - /* - * Ring cq doorbell frequently enough to avoid cq full. - * - * NB: IONIC_CQ_GRACE is 100 - */ - if (((cqHandle.cq_consumer - cqHandle.cq_dbpos) & (cqHandle.cqeNum - 1)) >= 100) { - cqHandle.cq_dbpos = cqHandle.cq_consumer; - uint64_t dbrVal = cqHandle.cq_dbval | ((cqHandle.cqeNum - 1) & (cqHandle.cq_dbpos)); -#if 0 - MORI_PRINTF("update cq doorbell, block:%u, warp:%u, lane:%u, cq dbrAddr:%p, dbrVal:0x%lx, cq_consumer:%u\n", - blockIdx.x, threadIdx.x/warpSize, __lane_id(), reinterpret_cast(cqHandle.dbrRecAddr), dbrVal, cqHandle.cq_consumer); -#endif - __atomic_store_n(reinterpret_cast(cqHandle.dbrRecAddr), dbrVal, - __ATOMIC_SEQ_CST); // TODO:maybe relaxed? - } - - wqHandle.doneIdx = msn; - return 0; -} -#endif - -#ifdef IONIC_CCQE -template <> -inline __device__ int PollCq(WorkQueueHandle& wqHandle, - CompletionQueueHandle& cqHandle, void* cqAddr, - uint32_t cqeNum, uint32_t* consIdx, - uint16_t* wqeCounter) { - PollCqOnce2(wqHandle, cqHandle, 1, cqAddr, cqeNum, *consIdx); - *wqeCounter = *consIdx; - return 0; -} -#else -template <> -inline __device__ int PollCq(WorkQueueHandle& wqHandle, - CompletionQueueHandle& cqHandle, void* cqAddr, - uint32_t cqeNum, uint32_t* consIdx, - uint16_t* wqeCounter) { - uint32_t greed = 10; - const uint32_t curConsIdx = *consIdx; - uint64_t activemask = GetActiveLaneMask(); - uint32_t cons = wqHandle.dbTouchIdx; - int err; - /* wait for sq_msn to catch up or pass cons. */ - /* 0x800000 - sign bit for 24-bit fields */ - while ((wqHandle.doneIdx - cons) & 0x800000) { - if (!spin_lock_try_acquire_shared(&cqHandle.pollCqLock, activemask)) { - continue; - } - - /* with lock acquired, this wave polls cqes until caught up */ - while ((wqHandle.doneIdx - cons) & 0x800000) { - uint32_t old_sq_msn = wqHandle.doneIdx; - // MORI_PRINTF("PollCq, before PollCqOnce2, curConsIdx:%u\n", curConsIdx); - // asm volatile("" ::: "memory"); - err = PollCqOnce2(wqHandle, cqHandle, activemask, cqAddr, cqeNum, curConsIdx); - if (err != 0) { - MORI_PRINTF("PollCq, PollCqOnce2 failed, err:%u\n", err); - return err; - } - asm volatile("" ::: "memory"); - // MORI_PRINTF("PollCq, after PollCqOnce2, curConsIdx:%u\n", curConsIdx); - if (!((wqHandle.doneIdx - cons) & 0x800000)) { - if (wqHandle.doneIdx == old_sq_msn) { - break; - } - if (!greed) { - break; - } - --greed; - } - } - - spin_lock_release_shared(&cqHandle.pollCqLock, activemask); - break; - } - - return 0; -} -#endif +#endif // end of PollCq template <> inline __device__ void UpdateCqDbrRecord(CompletionQueueHandle& cq, @@ -814,19 +531,6 @@ inline __device__ void UpdateCqDbrRecord(CompletionQueueHandl #endif } -template <> -inline __device__ int PollCqAndUpdateDbr(CompletionQueueHandle& cq, - uint32_t* consIdx, uint32_t* lockVar) { - AcquireLock(lockVar); - - int err = PollCq(cq.cqAddr, cq.cqeNum, consIdx); - if (err >= 0) { - UpdateCqDbrRecord(cq, *consIdx); - } - - ReleaseLock(lockVar); - return err; -} // #endif } // namespace core } // namespace mori diff --git a/include/mori/core/transport/rdma/providers/ionic/ionic_dv.h b/include/mori/core/transport/rdma/providers/ionic/ionic_dv.h index a1815990..faa167ec 100644 --- a/include/mori/core/transport/rdma/providers/ionic/ionic_dv.h +++ b/include/mori/core/transport/rdma/providers/ionic/ionic_dv.h @@ -177,6 +177,16 @@ int ionic_dv_pd_set_sqcmb(struct ibv_pd* ibpd, bool enable, bool expdb, bool req */ int ionic_dv_pd_set_rqcmb(struct ibv_pd* ibpd, bool enable, bool expdb, bool require); +/** + * ionic_dv_pd_set_expdb_mask - Specify expdb mask. + * + * Queues associated with this pd will attempt to have expdb on for WQE sizes + * other than default (and supported by the NIC). + * + * @mask - IONIC_EPXDB_* bitmap + */ +int ionic_dv_pd_set_expdb_mask(struct ibv_pd* ibpd, uint8_t mask); + /** * ionic_dv_qp_set_gda - Enable or disable GPU-Direct Async (GDA) mode. * @@ -241,6 +251,31 @@ int ionic_dv_qp_get_send_dbell_data(struct ibv_qp* ibqp, uint64_t* dbdata); */ int ionic_dv_qp_get_recv_dbell_data(struct ibv_qp* ibqp, uint64_t* dbdata); +enum ionic_cq_init_attr_mask { + IONIC_CQ_INIT_ATTR_MASK_FLAGS = 1 << 0, +}; + +enum ionic_cq_init_attr_flags { + IONIC_CQ_INIT_ATTR_CCQE = 1 << 0, +}; + +struct ionic_cq_init_attr_ex { + /* One or more flags of enum ionic_cq_init_attr_mask */ + uint32_t comp_mask; + /* One or more flags of enum ionic_cq_init_attr_flags */ + uint32_t flags; +}; + +/** + * ionic_dv_create_cq_ex - Create an IBV CQ with vendor-specific attributes. + * + * @ibctx - Context CQ will be attached to. + * @ex - IBV attributes to create the CQ with. + * @ionic_ex - Vendor-specific attributes to create the CQ with. + */ +struct ibv_cq_ex* ionic_dv_create_cq_ex(struct ibv_context* ibctx, struct ibv_cq_init_attr_ex* ex, + struct ionic_cq_init_attr_ex* ionic_ex); + /** * ionic_dv_get_ctx - Extract context information for gpu-initiated rdma. */ diff --git a/include/mori/core/transport/rdma/providers/ionic/ionic_fw.h b/include/mori/core/transport/rdma/providers/ionic/ionic_fw.h index da5d5371..7bc40919 100644 --- a/include/mori/core/transport/rdma/providers/ionic/ionic_fw.h +++ b/include/mori/core/transport/rdma/providers/ionic/ionic_fw.h @@ -151,22 +151,34 @@ union ionic_v1_pld { __u8 data[32]; }; +struct ionic_v1_cqe_send { + __u8 rsvd[4]; + __be32 msg_msn; + __u8 rsvd2[8]; + __le64 npg_wqe_idx_timestamp; +}; + +struct ionic_v1_cqe_recv { + __le64 wqe_idx_timestamp; + __be32 src_qpn_op; + __u8 src_mac[6]; + __be16 vlan_tag; + __be32 imm_data_rkey; +}; + +struct ionic_v1_cqe_rcqe { + __be64 wqe_idx_timestamp; + __u8 rsvd[8]; + __be32 seq_op_flags; + __be32 imm_data_rkey; +}; + /* completion queue v1 cqe */ struct ionic_v1_cqe { union { - struct { - __le64 wqe_idx_timestamp; - __be32 src_qpn_op; - __u8 src_mac[6]; - __be16 vlan_tag; - __be32 imm_data_rkey; - } recv; - struct { - __u8 rsvd[4]; - __be32 msg_msn; - __u8 rsvd2[8]; - __le64 npg_wqe_idx_timestamp; - } send; + struct ionic_v1_cqe_send send; + struct ionic_v1_cqe_recv recv; + struct ionic_v1_cqe_rcqe rcqe; }; __be32 status_length; __be32 qid_type_flags; @@ -178,6 +190,30 @@ enum ionic_v1_cqe_wqe_idx_timestamp_bits { IONIC_V1_CQE_TIMESTAMP_SHIFT = 16, }; +/* bits for rcqe seq_op_flags */ +enum ionic_v1_cqe_rcqe_op_flag_bits { + IONIC_V1_CQE_RCQE_SEQ_MASK = 0xffffff, + IONIC_V1_CQE_RCQE_FLAG_V = BIT(24), + IONIC_V1_CQE_RCQE_FLAG_I = BIT(25), + IONIC_V1_CQE_RCQE_OP_SHIFT = 28, +}; + +static inline uint32_t ionic_v1_rcqe_seq(uint32_t seq_opf) { + return seq_opf & IONIC_V1_CQE_RCQE_SEQ_MASK; +} + +static inline uint8_t ionic_v1_rcqe_op(uint32_t seq_opf) { + return seq_opf >> IONIC_V1_CQE_RCQE_OP_SHIFT; +} + +static inline bool ionic_v1_rcqe_valid(uint32_t seq_opf) { + return seq_opf & IONIC_V1_CQE_RCQE_FLAG_V; +} + +static inline bool ionic_v1_rcqe_ready(uint32_t seq_opf) { + return seq_opf & IONIC_V1_CQE_RCQE_FLAG_I; +} + /* bits for cqe recv */ enum ionic_v1_cqe_src_qpn_bits { IONIC_V1_CQE_RECV_QPN_MASK = 0xffffff, @@ -207,7 +243,7 @@ enum ionic_v1_cqe_qtf_bits { IONIC_V1_CQE_TYPE_RECV = 1, IONIC_V1_CQE_TYPE_SEND_MSN = 2, IONIC_V1_CQE_TYPE_SEND_NPG = 3, - IONIC_V1_CQE_TYPE_RECV_INDIR = 4, + IONIC_V1_CQE_TYPE_RECV_RCQE = 4, }; #if !defined(__HIP_PLATFORM_AMD__) && !defined(__HIP_PLATFORM_HCC__) @@ -462,46 +498,25 @@ static inline int ionic_v1_use_spec_sge(int min_sge, int spec) { } #define IONIC_RCQ_SIZE 4096 -#define IONIC_RCQ_DEPTH 128 -#define IONIC_RCQ_DEPTH_LOG2 7 -#define IONIC_RCQ_STRIDE_LOG2 4 struct ionic_rcq_hdr { - uint8_t pad[60]; - uint32_t seq_pad; -}; - -struct ionic_rcqe { - uint32_t status_length; - uint32_t imm_data; - uint32_t seq_flags; - uint32_t rsvd; -}; - -enum ionic_rcqe_flag { - IONIC_RCQE_C = BIT(7), - IONIC_RCQE_I = BIT(6), + __be32 seq; + __be32 ack; }; struct ionic_rcq { - struct ionic_rcq_hdr hdr; - struct ionic_rcqe ring[IONIC_RCQ_DEPTH]; + union { + uint8_t bytes[IONIC_RCQ_SIZE]; + struct ionic_rcq_hdr hdr; + }; }; -static inline uint32_t ionic_rcq_hdr_seq(struct ionic_rcq_hdr* hdr) { - return be32toh(hdr->seq_pad) >> 8; -} - -static inline uint32_t ionic_rcqe_seq(struct ionic_rcqe* rcqe) { - return be32toh(rcqe->seq_flags) >> 8; -} - -static inline bool ionic_rcqe_color(struct ionic_rcqe* rcqe) { - return !!(rcqe->seq_flags & htobe32(IONIC_RCQE_C)); +static inline uint32_t ionic_rcq_seq(struct ionic_rcq* rcq) { + return be32toh(rcq->hdr.seq) & IONIC_V1_CQE_RCQE_SEQ_MASK; } -static inline bool ionic_rcqe_imm(struct ionic_rcqe* rcqe) { - return !!(rcqe->seq_flags & htobe32(IONIC_RCQE_I)); +static inline void ionic_rcq_ack(struct ionic_rcq* rcq, uint32_t ack) { + rcq->hdr.ack = htobe32(ack); } #endif // !defined(__cplusplus) diff --git a/python/mori/jit/cache.py b/python/mori/jit/cache.py index c8ab2ebb..92f9f076 100644 --- a/python/mori/jit/cache.py +++ b/python/mori/jit/cache.py @@ -62,10 +62,11 @@ def get_cache_dir( profiler: bool = False, *, cov: int | None = None, + ccqe: bool = False, ) -> Path: """Return the cache directory for a specific arch + NIC + content combo. - Structure: /_[_profiler][_cov]// + Structure: /_[_ccqe][_profiler][_cov]// Args: profiler: When True, appends '_profiler' to the directory name so that @@ -74,10 +75,17 @@ def get_cache_dir( included in the directory name to separate bitcode compiled with different ABI versions (e.g. cov5 for Triton, cov6 for FlyDSL). None omits the suffix for backward compatibility. + ccqe: When True, appends '_ccqe' so CCQE and non-CCQE kernels are + cached separately (they differ by -DIONIC_CCQE compile flag). """ content_hash = _hash_tree(source_paths) + ccqe_suffix = "_ccqe" if ccqe else "" profiler_suffix = "_profiler" if profiler else "" cov_suffix = f"_cov{cov}" if cov is not None else "" - d = get_cache_root() / f"{arch}_{nic}{profiler_suffix}{cov_suffix}" / content_hash + d = ( + get_cache_root() + / f"{arch}_{nic}{ccqe_suffix}{profiler_suffix}{cov_suffix}" + / content_hash + ) d.mkdir(parents=True, exist_ok=True) return d diff --git a/python/mori/jit/core.py b/python/mori/jit/core.py index 825d56f3..5b4c1688 100644 --- a/python/mori/jit/core.py +++ b/python/mori/jit/core.py @@ -114,6 +114,7 @@ def _hipcc_device_bc( "-D__HIP_PLATFORM_AMD__", "-DHIP_ENABLE_WARP_SYNC_BUILTINS", *_nic_defines(), + *_ccqe_defines(), *_profiler_defines(), ] for d in include_dirs: @@ -162,6 +163,107 @@ def _verify_bitcode(cfg: BuildConfig, bc_path: Path) -> None: ) +def _lib_has_ionic_ccqe() -> bool: + """Check whether the ionic driver supports CCQE by probing the runtime library symbol.""" + import ctypes + import ctypes.util + + lib_name = ctypes.util.find_library("ionic") + if lib_name is None: + return False + try: + lib = ctypes.CDLL(lib_name) + return hasattr(lib, "ionic_dv_create_cq_ex") + except OSError: + return False + + +_CCQE_MIN_FW_VERSION = (1, 117, 5, 58) + + +def _parse_ionic_fw_version(fw_ver: str) -> tuple[int, ...] | None: + """Parse '1.117.5-a-58' → (1, 117, 5, 58). Returns None if unparseable.""" + if not fw_ver: + return None + m = re.match(r"^(\d+)\.(\d+)\.(\d+)-a-?(\d+)$", fw_ver) + if not m: + return None + return tuple(int(x) for x in m.groups()) + + +def _is_firmware_support_ccqe(fw_ver: str) -> bool: + """Return True if the firmware version >= 1.117.5-a-58.""" + ver = _parse_ionic_fw_version(fw_ver) + return ver is not None and ver >= _CCQE_MIN_FW_VERSION + + +def _get_ionic_fw_versions() -> list[str]: + """Return fw_ver strings for every ionic IB device found in sysfs.""" + ib_dir = "/sys/class/infiniband" + versions: list[str] = [] + try: + for dev in os.listdir(ib_dir): + dev_path = os.path.join(ib_dir, dev) + driver_link = os.path.join(dev_path, "device", "driver") + try: + driver_name = os.path.basename(os.readlink(driver_link)) + except OSError: + continue + if driver_name not in ("ionic_rdma", "ionic"): + continue + fw_path = os.path.join(dev_path, "fw_ver") + try: + fw_ver = Path(fw_path).read_text().strip() + versions.append(fw_ver) + except OSError: + pass + except OSError: + pass + return versions + + +def _is_all_ionic_support_ccqe() -> bool: + """Return True only when every ionic device has the same fw version and that version >= 58.""" + versions = _get_ionic_fw_versions() + if not versions: + return False + if len(set(versions)) != 1: + return False + + print(f"ionic ver: {versions[-1]}") + + for ver in versions: + if not _is_firmware_support_ccqe(ver): + return False + + return True + + +_ccqe_enabled: bool | None = None + + +def is_ccqe_enabled() -> bool: + """Return True if CCQE should be enabled (cached after first call).""" + global _ccqe_enabled + if _ccqe_enabled is None: + if os.environ.get("MORI_DISABLE_IONIC_CCQE") == "1": + _ccqe_enabled = False + print("Ionic _ccqe_enabled: False (disabled by MORI_DISABLE_IONIC_CCQE)") + else: + lib_support = _lib_has_ionic_ccqe() + nic_support = _is_all_ionic_support_ccqe() + _ccqe_enabled = lib_support and nic_support + print( + f"Ionic _ccqe_enabled: {_ccqe_enabled} lib_support {lib_support} nic_support: {nic_support}" + ) + + return _ccqe_enabled + + +def _ccqe_defines() -> list[str]: + return ["-DIONIC_CCQE"] if is_ccqe_enabled() else [] + + def _nic_defines() -> list[str]: """Return compiler -D flags for the detected NIC type (device-side macros).""" nic = detect_nic_type() @@ -305,8 +407,10 @@ def _hipcc_genco( "-D__HIP_PLATFORM_AMD__", "-DHIP_ENABLE_WARP_SYNC_BUILTINS", *_nic_defines(), + *_ccqe_defines(), *_profiler_defines(), ] + for d in include_dirs: cmd.extend(["-I", str(d)]) cmd.extend([str(source), "-o", str(output)]) @@ -379,6 +483,7 @@ def compile_genco( cfg = detect_build_config() nic = detect_nic_type() profiler = is_profiler_enabled() + ccqe = is_ccqe_enabled() include_dirs = _collect_include_dirs(mori_root) sub_kernels = _PARALLEL_KERNEL_GROUPS.get(kernel_name) @@ -387,7 +492,9 @@ def compile_genco( mori_root / "src" / "ops" / "kernels", mori_root / "include" / "mori", ] - cache_dir = get_cache_dir(cfg.arch, source_paths, nic, profiler=profiler) + cache_dir = get_cache_dir( + cfg.arch, source_paths, nic, profiler=profiler, ccqe=ccqe + ) hsaco_paths = [cache_dir / f"{k}.hsaco" for k in sub_kernels] if all(p.is_file() for p in hsaco_paths): @@ -432,7 +539,7 @@ def compile_genco( raise FileNotFoundError(f"Kernel source not found: {source}") source_paths = [source, mori_root / "include" / "mori"] - cache_dir = get_cache_dir(cfg.arch, source_paths, nic, profiler=profiler) + cache_dir = get_cache_dir(cfg.arch, source_paths, nic, profiler=profiler, ccqe=ccqe) hsaco_path = cache_dir / f"{kernel_name}.hsaco" if hsaco_path.is_file(): @@ -448,7 +555,7 @@ def compile_genco( nic = detect_nic_type() print( f"[mori-jit] Compiling {kernel_name} for {cfg.arch} " - f"(nic={nic}, profiler={profiler}) ..." + f"(nic={nic}, ccqe={ccqe}, profiler={profiler}) ..." ) _hipcc_genco(cfg, source, include_dirs, hsaco_path) print(f"[mori-jit] Cached: {hsaco_path}") @@ -476,12 +583,15 @@ def ensure_bitcode(*, cov: int = 5) -> str: nic = detect_nic_type() profiler = is_profiler_enabled() + ccqe = is_ccqe_enabled() source_paths = [ mori_root / "src" / "shmem" / "shmem_device_api_wrapper.cpp", mori_root / "include" / "mori" / "shmem", mori_root / "include" / "mori" / "core", ] - cache_dir = get_cache_dir(cfg.arch, source_paths, nic, profiler=profiler, cov=cov) + cache_dir = get_cache_dir( + cfg.arch, source_paths, nic, profiler=profiler, cov=cov, ccqe=ccqe + ) bc_path = cache_dir / _BC_FILENAME if bc_path.is_file(): diff --git a/src/application/transport/rdma/providers/ionic/ionic.cpp b/src/application/transport/rdma/providers/ionic/ionic.cpp index e5fea823..d0a2c4cc 100644 --- a/src/application/transport/rdma/providers/ionic/ionic.cpp +++ b/src/application/transport/rdma/providers/ionic/ionic.cpp @@ -25,8 +25,13 @@ #include #include +#include +#include +#include +#include #include #include +#include #include "mori/application/utils/check.hpp" #include "mori/application/utils/math.hpp" @@ -40,6 +45,53 @@ namespace application { /* Device Attributes */ /* ---------------------------------------------------------------------------------------------- */ +namespace { + +using FwVersion = std::tuple; +constexpr FwVersion kCcqeMinFwVersion{1, 117, 5, 58}; + +// Parse "1.117.5-a-58" or "1.117.5-a58" into (1,117,5,58). +std::optional ParseIonicFwVersion(const char* fw_ver) { + int major, minor, patch, build; + char tag; + if (sscanf(fw_ver, "%d.%d.%d-%c-%d", &major, &minor, &patch, &tag, &build) == 5 || + sscanf(fw_ver, "%d.%d.%d-%c%d", &major, &minor, &patch, &tag, &build) == 5) { + return FwVersion{major, minor, patch, build}; + } + return std::nullopt; +} + +std::optional ReadIonicFwVersion(const char* dev_name) { + char path[256]; + snprintf(path, sizeof(path), "/sys/class/infiniband/%s/fw_ver", dev_name); + + FILE* f = fopen(path, "r"); + if (!f) return std::nullopt; + + char buf[64] = {}; + fgets(buf, sizeof(buf), f); + fclose(f); + + // Strip trailing newline. + buf[strcspn(buf, "\n")] = '\0'; + return ParseIonicFwVersion(buf); +} + +bool IsCcqeSupported(ibv_context* context) { + const char* disable_ccqe = std::getenv("MORI_DISABLE_IONIC_CCQE"); + if (disable_ccqe && std::strcmp(disable_ccqe, "1") == 0) return false; + if (IonicDvApi::Instance().create_cq_ex == nullptr) return false; + + /* Minimum firmware version verified by MORI to support CCQE is 1.117.5-a-58. */ + auto ver = ReadIonicFwVersion(context->device->name); + MORI_APP_TRACE("dev: {} fw_ver {}.{}.{}-a-{}", context->device->name, + ver ? std::get<0>(*ver) : -1, ver ? std::get<1>(*ver) : -1, + ver ? std::get<2>(*ver) : -1, ver ? std::get<3>(*ver) : -1); + return ver.has_value() && *ver >= kCcqeMinFwVersion; +} + +} // namespace + /* ---------------------------------------------------------------------------------------------- */ /* IonicCqContainer */ /* ---------------------------------------------------------------------------------------------- */ @@ -52,14 +104,9 @@ IonicCqContainer::IonicCqContainer(ibv_context* context, const RdmaEndpointConfi cqeNum = config.maxCqeNum; + const bool ccqe_enabled = IsCcqeSupported(context); + memset(&cq_attr, 0, sizeof(struct ibv_cq_init_attr_ex)); -#ifdef IONIC_CCQE - cq_attr.cqe = 0; - MORI_APP_TRACE("cqe mode: ccqe mode"); -#else - cq_attr.cqe = cqeNum * 2; // from rocshmem, send&recv? - MORI_APP_TRACE("cqe mode: normal mode"); -#endif cq_attr.cq_context = nullptr; cq_attr.channel = nullptr; cq_attr.comp_vector = 0; @@ -67,7 +114,20 @@ IonicCqContainer::IonicCqContainer(ibv_context* context, const RdmaEndpointConfi cq_attr.comp_mask = IBV_CQ_INIT_ATTR_MASK_PD; cq_attr.parent_domain = pd; - cq_ex = ibv_create_cq_ex(context, &cq_attr); + if (ccqe_enabled) { + MORI_APP_TRACE("cqe mode: ccqe mode"); + struct ionic_cq_init_attr_ex ionic_cq_attr; + memset(&ionic_cq_attr, 0, sizeof(struct ionic_cq_init_attr_ex)); + ionic_cq_attr.comp_mask = IONIC_CQ_INIT_ATTR_MASK_FLAGS; + ionic_cq_attr.flags = IONIC_CQ_INIT_ATTR_CCQE; + cq_attr.cqe = 1; + cq_ex = IonicDvApi::Instance().create_cq_ex(context, &cq_attr, &ionic_cq_attr); + } else { + MORI_APP_TRACE("cqe mode: normal mode"); + cq_attr.cqe = cqeNum * 2; // from rocshmem, send&recv? + cq_ex = ibv_create_cq_ex(context, &cq_attr); + } + assert(cq_ex); cq = ibv_cq_ex_to_cq(cq_ex); assert(cq);