Skip to content

Commit 95e1069

Browse files
fix(vm): Preserve sync helper frame when it suspends inside async coro.
1 parent 1b330ec commit 95e1069

11 files changed

Lines changed: 149 additions & 42 deletions

File tree

compiler/src/main/exports.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,10 @@ pub unsafe extern "C" fn run_push_event(ptr: *const u8, len: u32) -> i32 {
231231
.find(|(_, h)| matches!(h.state, crate::modules::vm::types::CoroState::WaitingEvent))
232232
.map(|(i, h)| (i, h.coro));
233233
if let Some((idx, coro)) = waiter {
234-
if let crate::modules::vm::types::HeapObj::Coroutine(_, _, saved_stack, _, _) = vm.heap.get_mut(coro) {
235-
if let Some(top) = saved_stack.last_mut() { *top = val; } else { saved_stack.push(val); }
234+
if let crate::modules::vm::types::HeapObj::Coroutine(_, _, saved_stack, _, _, sub_frames) = vm.heap.get_mut(coro) {
235+
// When a sync helper called from this coro hit `receive()`, the None placeholder lives in the innermost suspended frame's stack — not the outer's. Pick that frame's buffer when present.
236+
let target_stack = if let Some(frame) = sub_frames.last_mut() { &mut frame.stack_delta } else { saved_stack };
237+
if let Some(top) = target_stack.last_mut() { *top = val; } else { target_stack.push(val); }
236238
}
237239
vm.scheduler[idx].state = crate::modules::vm::types::CoroState::Ready;
238240
} else {
@@ -257,8 +259,9 @@ pub unsafe extern "C" fn set_host_result(handle: u32) -> i32 {
257259
.find(|(_, h)| matches!(h.state, crate::modules::vm::types::CoroState::WaitingHostCall))
258260
.map(|(i, h)| (i, h.coro));
259261
let Some((idx, coro)) = waiter else { return 2; };
260-
if let crate::modules::vm::types::HeapObj::Coroutine(_, _, saved_stack, _, _) = vm.heap.get_mut(coro) {
261-
if let Some(top) = saved_stack.last_mut() { *top = val; } else { saved_stack.push(val); }
262+
if let crate::modules::vm::types::HeapObj::Coroutine(_, _, saved_stack, _, _, sub_frames) = vm.heap.get_mut(coro) {
263+
let target_stack = if let Some(frame) = sub_frames.last_mut() { &mut frame.stack_delta } else { saved_stack };
264+
if let Some(top) = target_stack.last_mut() { *top = val; } else { target_stack.push(val); }
262265
}
263266
vm.scheduler[idx].state = crate::modules::vm::types::CoroState::Ready;
264267
0

compiler/src/modules/vm/builtins/async_ops.rs

Lines changed: 77 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,41 +5,88 @@ use super::super::types::*;
55

66
impl<'a> VM<'a> {
77

8-
/* Resume a coroutine. On yield: persist ip/slots/stack/iters back into it. On return: restore caller stack/iter state. */
8+
// Resume coroutine: persist state on yield, restore caller on return. Suspended sync sub-frames run innermost-first, each pushing its result onto the next frame's stack at the Call site.
99
pub fn resume_coroutine(&mut self, callee: Val) -> Result<Val, VmErr> {
10-
if let HeapObj::Coroutine(ip, saved_slots, saved_stack, fi, saved_iters) = self.heap.get(callee) {
11-
let (ip, fi) = (*ip, *fi);
12-
let mut fn_slots = saved_slots.clone();
13-
let saved_stack_len = self.stack.len();
14-
let saved_iter_len = self.iter_stack.len();
15-
self.stack.extend_from_slice(&saved_stack.clone());
16-
self.iter_stack.extend(saved_iters.clone());
17-
let saved_yielded = self.yielded;
18-
self.yielded = false;
19-
self.depth += 1;
20-
let (_, body, _, _) = self.functions[fi];
21-
let result = self.exec_from(body, &mut fn_slots, ip);
22-
self.depth -= 1;
23-
let result = result?;
24-
if self.yielded {
25-
let resume_ip = self.resume_ip;
26-
let remaining = self.stack.split_off(saved_stack_len);
27-
let coro_iters: Vec<IterFrame> = self.iter_stack.drain(saved_iter_len..).collect();
28-
if let HeapObj::Coroutine(sip, ss, sst, _, si) = self.heap.get_mut(callee) {
29-
*sip = resume_ip;
30-
*ss = fn_slots;
31-
*sst = remaining;
32-
*si = coro_iters;
10+
let (outer_ip, mut outer_slots, outer_stack, outer_fi, outer_iters, mut sync_frames) =
11+
if let HeapObj::Coroutine(ip, slots, stack, fi, iters, sf) = self.heap.get(callee) {
12+
(*ip, slots.clone(), stack.clone(), *fi, iters.clone(), sf.clone())
13+
} else {
14+
return Err(cold_type("not a coroutine"));
15+
};
16+
17+
let saved_stack_len = self.stack.len();
18+
let saved_iter_len = self.iter_stack.len();
19+
self.stack.extend_from_slice(&outer_stack);
20+
self.iter_stack.extend(outer_iters);
21+
let saved_yielded = self.yielded;
22+
self.yielded = false;
23+
self.depth += 1;
24+
25+
// Walk frames inside-out, then the outer. `outer_ran` tracks whether `outer_ip` should be overwritten by `resume_ip` on save — a re-yield inside a sync frame leaves the outer pristine.
26+
let mut outer_ran = false;
27+
let result: Result<Val, VmErr> = 'drive: loop {
28+
if let Some(frame) = sync_frames.pop() {
29+
let SyncFrame { ip, fi, mut slots, stack_delta, iter_delta } = frame;
30+
let frame_stack_base = self.stack.len();
31+
let frame_iter_base = self.iter_stack.len();
32+
self.stack.extend(stack_delta);
33+
self.iter_stack.extend(iter_delta);
34+
let (_, body, _, _) = self.functions[fi];
35+
match self.exec_from(body, &mut slots, ip) {
36+
Err(e) => break 'drive Err(e),
37+
Ok(val) if self.yielded => {
38+
let new_stack = if self.stack.len() > frame_stack_base { self.stack.split_off(frame_stack_base) } else { Vec::new() };
39+
let new_iter: Vec<IterFrame> = if self.iter_stack.len() > frame_iter_base { self.iter_stack.drain(frame_iter_base..).collect() } else { Vec::new() };
40+
sync_frames.push(SyncFrame {
41+
ip: self.resume_ip, fi, slots,
42+
stack_delta: new_stack, iter_delta: new_iter,
43+
});
44+
// Any deeper sync calls suspended during this exec come back via the VM-level buffer; chain them on top (still innermost-last).
45+
let newer = core::mem::take(&mut self.pending_sync_frames);
46+
sync_frames.extend(newer);
47+
break 'drive Ok(val);
48+
}
49+
Ok(val) => {
50+
// Frame completed; its return value feeds whatever frame (or outer) was waiting at the Call site.
51+
self.push(val);
52+
}
3353
}
34-
Ok(result)
3554
} else {
36-
self.stack.truncate(saved_stack_len);
37-
self.iter_stack.truncate(saved_iter_len);
38-
self.yielded = saved_yielded;
39-
Ok(result)
55+
let (_, body, _, _) = self.functions[outer_fi];
56+
outer_ran = true;
57+
match self.exec_from(body, &mut outer_slots, outer_ip) {
58+
Err(e) => break 'drive Err(e),
59+
Ok(val) => {
60+
if self.yielded {
61+
let newer = core::mem::take(&mut self.pending_sync_frames);
62+
sync_frames.extend(newer);
63+
}
64+
break 'drive Ok(val);
65+
}
66+
}
67+
}
68+
};
69+
70+
self.depth -= 1;
71+
let result = result?;
72+
73+
if self.yielded {
74+
let resume_ip = if outer_ran { self.resume_ip } else { outer_ip };
75+
let remaining = self.stack.split_off(saved_stack_len);
76+
let coro_iters: Vec<IterFrame> = self.iter_stack.drain(saved_iter_len..).collect();
77+
if let HeapObj::Coroutine(sip, ss, sst, _, si, sf) = self.heap.get_mut(callee) {
78+
*sip = resume_ip;
79+
*ss = outer_slots;
80+
*sst = remaining;
81+
*si = coro_iters;
82+
*sf = sync_frames;
4083
}
84+
Ok(result)
4185
} else {
42-
Err(cold_type("not a coroutine"))
86+
self.stack.truncate(saved_stack_len);
87+
self.iter_stack.truncate(saved_iter_len);
88+
self.yielded = saved_yielded;
89+
Ok(result)
4390
}
4491
}
4592

compiler/src/modules/vm/dispatch.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,11 @@ impl<'a> VM<'a> {
169169
match self.dispatch(chunk, slots, &mut cache, insns, consts, &mut ip) {
170170
Ok(None) => {
171171
if self.yielded {
172-
// Event yields keep the None placeholder; `run_push_event` overwrites it before resume.
172+
// Event yields keep the None placeholder (overwritten by `run_push_event` before resume). Sync sub-call yields pushed nothing — the helper's return lands on the stack when its frame completes — so don't pop and don't skip the next PopTop.
173173
let event_yield = self.pending.event_wait_request;
174-
let val = if event_yield { Val::none() } else { self.pop().unwrap_or(Val::none()) };
175-
self.resume_ip = if !event_yield && ip < n && matches!(insns.get(ip), Some(ins) if ins.opcode == OpCode::PopTop) { ip + 1 } else { ip };
174+
let sub_call_yield = !self.pending_sync_frames.is_empty();
175+
let val = if event_yield || sub_call_yield { Val::none() } else { self.pop().unwrap_or(Val::none()) };
176+
self.resume_ip = if !event_yield && !sub_call_yield && ip < n && matches!(insns.get(ip), Some(ins) if ins.opcode == OpCode::PopTop) { ip + 1 } else { ip };
176177
self.live_slots.truncate(slots_base);
177178
self.exception_stack.truncate(exc_base);
178179
return Ok(val);

compiler/src/modules/vm/gc.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,18 @@ impl<'a> VM<'a> {
66
/* Mark all reachable roots then sweep; non-heap Vals are no-op to mark. */
77
pub(crate) fn collect(&mut self, current_slots: &[Val]) {
88
for &v in &self.stack { self.heap.mark(v); }
9+
for sf in &self.pending_sync_frames {
10+
for &v in &sf.slots { self.heap.mark(v); }
11+
for &v in &sf.stack_delta { self.heap.mark(v); }
12+
for fr in &sf.iter_delta {
13+
match fr {
14+
IterFrame::Seq { items, .. } => for &v in items { self.heap.mark(v); },
15+
IterFrame::Coroutine(v) => self.heap.mark(*v),
16+
IterFrame::UserDefined(v) => self.heap.mark(*v),
17+
IterFrame::Range { .. } => {}
18+
}
19+
}
20+
}
921
for &v in &self.with_stack { self.heap.mark(v); }
1022
for &v in &self.yields { self.heap.mark(v); }
1123
for &v in &self.event_queue { self.heap.mark(v); }

compiler/src/modules/vm/handlers/function.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,15 @@ impl<'a> VM<'a> {
145145
// Generator/coroutine: return a suspended Coroutine instead of running. Both flags are O(1).
146146
let is_async_fn = self.is_async.get(fi).copied().unwrap_or(false);
147147
if is_async_fn || body.is_generator {
148-
let coro = self.heap.alloc(HeapObj::Coroutine(0, fn_slots, Vec::new(), fi, Vec::new()))?;
148+
let coro = self.heap.alloc(HeapObj::Coroutine(0, fn_slots, Vec::new(), fi, Vec::new(), Vec::new()))?;
149149
self.push(coro);
150150
self.depth -= 1;
151151
return Ok(());
152152
}
153153

154+
// Snapshot caller-visible depths so we can split the helper's stack/iter contributions out if it suspends mid-body via a yielding builtin.
155+
let stack_base = self.stack.len();
156+
let iter_base = self.iter_stack.len();
154157
let yields_before = self.yields.len();
155158
let (callee_impure, exec_result) = self.run_body_with_frame(fi, body, chunk, &mut fn_slots, slots);
156159
self.depth -= 1;
@@ -160,6 +163,19 @@ impl<'a> VM<'a> {
160163
let result = exec_result?;
161164
if callee_impure { self.mark_impure(); }
162165

166+
if self.yielded {
167+
// Sync helper suspended mid-execution (e.g. `sleep(0)` from inside a sync fn called by an async coro). Stage its frame on the VM-level buffer; `resume_coroutine` drains it onto the enclosing coro so the helper is re-entered from the right ip. Without this, the outer's resume_ip would skip past the unfinished helper and the next StoreName would underflow. A nested sync call inside this helper would already have pushed its own frame first, so the buffer ends up innermost-last.
168+
let helper_resume_ip = self.resume_ip;
169+
self.resume_ip = 0;
170+
let helper_stack_delta = if self.stack.len() > stack_base { self.stack.split_off(stack_base) } else { Vec::new() };
171+
let helper_iter_delta: Vec<IterFrame> = if self.iter_stack.len() > iter_base { self.iter_stack.drain(iter_base..).collect() } else { Vec::new() };
172+
self.pending_sync_frames.push(SyncFrame {
173+
ip: helper_resume_ip, fi, slots: fn_slots,
174+
stack_delta: helper_stack_delta, iter_delta: helper_iter_delta,
175+
});
176+
return Ok(());
177+
}
178+
163179
if self.yields.len() > yields_before {
164180
let fn_yields = self.yields.split_off(yields_before);
165181
let val = self.heap.alloc(HeapObj::List(Rc::new(RefCell::new(fn_yields))))?;

compiler/src/modules/vm/handlers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ mod methods_helpers;
99

1010
pub(super) use crate::modules::vm::{
1111
VM, Val, VmErr, HeapObj, DictMap, cache, ops,
12-
types::{cold_depth, cold_type, cold_value, cold_runtime, cold_overflow, eq_vals_with_heap, ffloor}
12+
types::{IterFrame, SyncFrame, cold_depth, cold_type, cold_value, cold_runtime, cold_overflow, eq_vals_with_heap, ffloor}
1313
};
1414

1515
pub(super) use crate::modules::parser::{OpCode, SSAChunk, ssa_strip};

compiler/src/modules/vm/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ pub struct VM<'a> {
115115
pub(crate) sandbox_off: bool,
116116
pub(crate) with_stack: Vec<Val>,
117117
pub(crate) pending: Pending,
118+
/* Sync helpers that suspended during the current resume; drained into the active Coroutine on yield-save. Lives at VM scope (not `Pending`) because it propagates across dispatch frames, not within one. */
119+
pub(crate) pending_sync_frames: Vec<types::SyncFrame>,
118120
pub(crate) yielded: bool,
119121
pub(crate) resume_ip: usize,
120122
pub output: Vec<String>,
@@ -160,6 +162,7 @@ impl<'a> VM<'a> {
160162
max_calls: limits.calls,
161163
with_stack: Vec::new(),
162164
pending: Pending::new(),
165+
pending_sync_frames: Vec::new(),
163166
yielded: false,
164167
resume_ip: 0,
165168
strict_input: false,

compiler/src/modules/vm/types/coro.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ pub struct CoroutineHandle {
3333
pub state: CoroState,
3434
}
3535

36+
// Suspended sync helper frame: a plain user fn called from a coroutine hit a yielding builtin mid-execution, so its state is snapshotted and parked on the enclosing Coroutine. Frames stack innermost-last; resume walks inside-out so each return value lands on the next frame's stack at the Call site.
37+
#[derive(Clone, Debug)]
38+
pub struct SyncFrame {
39+
pub ip: usize,
40+
pub fi: usize,
41+
pub slots: Vec<Val>,
42+
pub stack_delta: Vec<Val>,
43+
pub iter_delta: Vec<IterFrame>,
44+
}
45+
3646
/* Call-site snapshot for traceback rendering; pushed by `exec_call`, popped on return/error. */
3747
#[derive(Clone, Debug)]
3848
pub struct CallFrame {

compiler/src/modules/vm/types/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ pub enum HeapObj {
164164
Property(Val, Val),
165165
// Intermediate produced by `prop.setter`: callable that takes a function and returns a new `Property` with the setter attached.
166166
PropertySetter(Val),
167-
Coroutine(usize, Vec<Val>, Vec<Val>, usize, Vec<IterFrame>),
167+
// Trailing `Vec<SyncFrame>` stacks suspended sync sub-calls (innermost-last): plain user fns called from this coro that hit a yielding builtin before returning. Resume walks inside-out so each return lands on the next frame's stack at the Call site.
168+
Coroutine(usize, Vec<Val>, Vec<Val>, usize, Vec<IterFrame>, Vec<SyncFrame>),
168169
/* Produced by `import m`; attr access via LoadAttr, calls fuse through CallMethod. */
169170
Module(String, Vec<(String, Val)>),
170171
/* A native binding lifted to a first-class callable. */
@@ -308,7 +309,7 @@ pub(crate) fn for_each_val(obj: &HeapObj, mut f: impl FnMut(Val)) {
308309
f(*cls);
309310
for (k, v) in attrs.borrow().iter() { f(k); f(v); }
310311
}
311-
HeapObj::Coroutine(_, slots, stack, _, iters) => {
312+
HeapObj::Coroutine(_, slots, stack, _, iters, sub_frames) => {
312313
for &v in slots { f(v); }
313314
for &v in stack { f(v); }
314315
for fr in iters { match fr {
@@ -317,6 +318,16 @@ pub(crate) fn for_each_val(obj: &HeapObj, mut f: impl FnMut(Val)) {
317318
IterFrame::UserDefined(v) => f(*v),
318319
IterFrame::Range { .. } => {}
319320
}}
321+
for sf in sub_frames {
322+
for &v in &sf.slots { f(v); }
323+
for &v in &sf.stack_delta { f(v); }
324+
for fr in &sf.iter_delta { match fr {
325+
IterFrame::Seq { items, .. } => for &v in items { f(v); },
326+
IterFrame::Coroutine(v) => f(*v),
327+
IterFrame::UserDefined(v) => f(*v),
328+
IterFrame::Range { .. } => {}
329+
}}
330+
}
320331
}
321332
HeapObj::Func(_, defaults, captures) => {
322333
for &v in defaults { f(v); }

0 commit comments

Comments
 (0)