Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ostool/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ pub mod tftp;
/// U-Boot bootloader integration.
pub mod uboot;

/// Shared byte-stream matcher for runtime output detection.
mod output_matcher;

/// OVMF prebuilt firmware downloader (internal).
mod ovmf_prebuilt;
114 changes: 114 additions & 0 deletions ostool/src/run/output_matcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::time::{Duration, Instant};

use anyhow::anyhow;
use regex::Regex;

pub(crate) const MATCH_DRAIN_DURATION: Duration = Duration::from_millis(500);

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum StreamMatchKind {
Success,
Fail,
}

#[derive(Debug, Clone)]
pub(crate) struct StreamMatch {
pub(crate) kind: StreamMatchKind,
pub(crate) matched_regex: String,
pub(crate) matched_text: String,
pub(crate) deadline: Instant,
}

#[derive(Debug, Clone)]
enum StreamMatchState {
Pending,
Matched(StreamMatch),
}

pub(crate) struct ByteStreamMatcher {
success_regex: Vec<Regex>,
fail_regex: Vec<Regex>,
line_buf: Vec<u8>,
state: StreamMatchState,
}

impl ByteStreamMatcher {
pub(crate) fn new(success_regex: Vec<Regex>, fail_regex: Vec<Regex>) -> Self {
Self {
success_regex,
fail_regex,
line_buf: Vec::with_capacity(0x1000),
state: StreamMatchState::Pending,
}
}

pub(crate) fn observe_byte(&mut self, byte: u8) -> Option<StreamMatch> {
self.line_buf.push(byte);

let first_match = match self.state {
StreamMatchState::Pending => {
Comment on lines +46 to +49
let line = String::from_utf8_lossy(&self.line_buf);

let matched = self
.fail_regex
.iter()
Comment on lines +50 to +54
.find(|regex| regex.is_match(&line))
.map(|regex| StreamMatch {
kind: StreamMatchKind::Fail,
matched_regex: regex.as_str().to_string(),
matched_text: line.to_string(),
deadline: Instant::now() + MATCH_DRAIN_DURATION,
})
.or_else(|| {
self.success_regex
.iter()
.find(|regex| regex.is_match(&line))
.map(|regex| StreamMatch {
kind: StreamMatchKind::Success,
matched_regex: regex.as_str().to_string(),
matched_text: line.to_string(),
deadline: Instant::now() + MATCH_DRAIN_DURATION,
})
});

if let Some(matched) = matched {
self.state = StreamMatchState::Matched(matched.clone());
Some(matched)
} else {
None
Comment on lines +74 to +78
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep scanning a matched line until newline before deciding outcome

observe_byte() now freezes the result on the first regex that matches any prefix of the current line. That changes the old semantics in both runners: previously they evaluated the completed line and gave fail patterns priority, but now an early success like "passed" will win permanently even if the same line later contains "FAILED" before the newline arrives. In runs where success and failure tokens can appear on one status line, this can turn a real failure into a reported success.

Useful? React with 👍 / 👎.

}
}
StreamMatchState::Matched(_) => None,
};

if byte == b'\n' {
self.line_buf.clear();
}

first_match
}

pub(crate) fn matched(&self) -> Option<&StreamMatch> {
match &self.state {
StreamMatchState::Pending => None,
StreamMatchState::Matched(matched) => Some(matched),
}
}

pub(crate) fn should_stop(&self) -> bool {
self.matched()
.is_some_and(|matched| Instant::now() >= matched.deadline)
}

pub(crate) fn final_result(&self) -> Option<anyhow::Result<()>> {
let matched = self.matched()?;
match matched.kind {
StreamMatchKind::Success => Some(Ok(())),
StreamMatchKind::Fail => Some(Err(anyhow!(
"Detected fail pattern '{}' in output: {}",
matched.matched_regex,
matched.matched_text.trim_end()
))),
}
}
}
138 changes: 78 additions & 60 deletions ostool/src/run/qemu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use std::{
path::Path,
path::PathBuf,
process::{Child, Stdio},
sync::mpsc,
thread,
time::Duration,
};

use anyhow::{Context, anyhow};
Expand All @@ -38,7 +41,10 @@ use tokio::fs;

use crate::{
ctx::AppContext,
run::ovmf_prebuilt::{Arch, FileType, Prebuilt, Source},
run::{
output_matcher::{ByteStreamMatcher, StreamMatch, StreamMatchKind},
ovmf_prebuilt::{Arch, FileType, Prebuilt, Source},
},
utils::PathResultExt,
};

Expand Down Expand Up @@ -254,37 +260,15 @@ impl QemuRunner {
}
}
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
Comment on lines 262 to +263
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Drain QEMU stderr while the guest is still running

Piping stderr here is a regression because process_output_stream() only consumes child.stdout; stderr is left unread until wait_with_output() after the loop finishes. For long-running guests that emit enough stderr (for example with QEMU debug flags, firmware warnings, or repeated device errors), the stderr pipe can fill, block QEMU, and hang the run before any success/fail regex is reached.

Useful? React with 👍 / 👎.

Comment on lines 262 to +263
cmd.print_cmd();
let mut child = cmd.spawn()?;

let mut qemu_result: Option<anyhow::Result<()>> = None;

let stdout = BufReader::new(child.stdout.take().unwrap());
let mut line_buf = Vec::new();

for byte in stdout.bytes() {
let byte = match byte {
Ok(b) => b,
Err(e) => {
println!("stdout: {:?}", e);
continue;
}
};
let _ = std::io::stdout().write_all(&[byte]);
let _ = std::io::stdout().flush();

line_buf.push(byte);
if byte != b'\n' {
continue;
}

let line = String::from_utf8_lossy(&line_buf).to_string();

self.check_output(&line, &mut child, &mut qemu_result)?;
}
let mut matcher =
ByteStreamMatcher::new(self.success_regex.clone(), self.fail_regex.clone());
Self::process_output_stream(&mut child, &mut matcher)?;

let out = child.wait_with_output()?;
if let Some(res) = qemu_result {
if let Some(res) = matcher.final_result() {
res?;
} else if !out.status.success() {
unsafe {
Expand Down Expand Up @@ -430,48 +414,82 @@ impl QemuRunner {
}
}

fn check_output(
&self,
out: &str,
fn process_output_stream(
child: &mut Child,
res: &mut Option<anyhow::Result<()>>,
matcher: &mut ByteStreamMatcher,
) -> anyhow::Result<()> {
// // Process QEMU output line here
// println!("{}", line);

for regex in &self.fail_regex {
if regex.is_match(out) {
*res = Some(Err(anyhow!(
"Detected failure pattern '{}' in QEMU output.",
regex.as_str()
)));

self.kill_qemu(child)?;
return Ok(());
let stdout = child
.stdout
.take()
.context("failed to capture QEMU stdout")?;
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let stdout = BufReader::new(stdout);
for byte in stdout.bytes() {
match byte {
Ok(byte) => {
if tx.send(Some(byte)).is_err() {
return;
}
}
Err(err) => {
eprintln!("stdout: {err:?}");
return;
}
}
}
}

for regex in &self.success_regex {
if regex.is_match(out) {
*res = Some(Ok(()));
println!(
"{}",
format!(
"Detected success pattern '{}' in QEMU output, terminating QEMU.",
regex.as_str()
)
.green()
);
self.kill_qemu(child)?;
return Ok(());
let _ = tx.send(None);
});

loop {
match rx.recv_timeout(Duration::from_millis(10)) {
Ok(Some(byte)) => {
let _ = std::io::stdout().write_all(&[byte]);
let _ = std::io::stdout().flush();

if let Some(matched) = matcher.observe_byte(byte) {
Self::print_match_event(&matched);
}
}
Ok(None) => break,
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}

if matcher.should_stop() {
Self::kill_qemu(child)?;
break;
}
}

Ok(())
}

fn kill_qemu(&self, child: &mut Child) -> anyhow::Result<()> {
child.kill()?;
fn print_match_event(matched: &StreamMatch) {
match matched.kind {
StreamMatchKind::Success => println!(
"{}",
format!(
"\n=== SUCCESS PATTERN MATCHED: {} ===",
matched.matched_regex
)
.green()
),
StreamMatchKind::Fail => println!(
"{}",
format!("\n=== FAIL PATTERN MATCHED: {} ===", matched.matched_regex).red()
),
}
}

fn kill_qemu(child: &mut Child) -> anyhow::Result<()> {
if let Err(err) = child.kill()
&& err.kind() != ErrorKind::InvalidInput
{
return Err(err.into());
}

// 尝试恢复终端状态
let _ = disable_raw_mode();
Expand Down
64 changes: 45 additions & 19 deletions ostool/src/run/uboot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use uboot_shell::UbootShell;

use crate::{
ctx::AppContext,
run::tftp,
run::{
output_matcher::{ByteStreamMatcher, MATCH_DRAIN_DURATION, StreamMatchKind},
tftp,
},
sterm::SerialTerm,
utils::{PathResultExt, replace_env_placeholders},
};
Expand Down Expand Up @@ -483,30 +486,53 @@ impl Runner {

println!("{}", "Interacting with U-Boot shell...".green());

let success_regex = self.success_regex.clone();
let fail_regex = self.fail_regex.clone();
let matcher = Arc::new(Mutex::new(ByteStreamMatcher::new(
self.success_regex.clone(),
self.fail_regex.clone(),
)));

let res = Arc::new(Mutex::<Option<anyhow::Result<()>>>::new(None));
let res_clone = res.clone();
let mut shell = SerialTerm::new(tx, rx, move |h, line| {
for regex in success_regex.iter() {
if regex.is_match(line) {
println!("{}", "\r\n=== SUCCESS PATTERN MATCHED ===".green());
h.stop();
let mut res_lock = res_clone.lock().unwrap();
*res_lock = Some(Ok(()));
return;
let matcher_clone = matcher.clone();
let mut shell = SerialTerm::new_with_byte_callback(tx, rx, move |h, byte| {
let mut matcher = matcher_clone.lock().unwrap();
if let Some(matched) = matcher.observe_byte(byte) {
match matched.kind {
StreamMatchKind::Success => {
println!(
"{}",
format!(
"\r\n=== SUCCESS PATTERN MATCHED: {} ===",
matched.matched_regex
)
.green()
);
let mut res_lock = res_clone.lock().unwrap();
*res_lock = Some(Ok(()));
}
StreamMatchKind::Fail => {
println!(
"{}",
format!(
"\r\n=== FAIL PATTERN MATCHED: {} ===",
matched.matched_regex
)
.red()
);
let mut res_lock = res_clone.lock().unwrap();
*res_lock = Some(Err(anyhow!(
"Fail pattern matched '{}': {}",
matched.matched_regex,
matched.matched_text.trim_end()
)));
}
}

h.stop_after(MATCH_DRAIN_DURATION);
}

for regex in fail_regex.iter() {
if regex.is_match(line) {
println!("{}", "\r\n=== FAIL PATTERN MATCHED ===".red());
h.stop();
let mut res_lock = res_clone.lock().unwrap();
*res_lock = Some(Err(anyhow!("Fail pattern matched: {}", line)));
return;
}
if matcher.should_stop() {
h.stop();
}
});
shell.run().await?;
Expand Down
Loading