Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ thiserror = "1.0"
libc = "0.2"
crossbeam-utils = "0.8"
serde = "1.0"
crossbeam-channel = "0.5"
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
opentelemetry-zipkin = { version = "0.15", features = ["reqwest-client"], default-features = false }
opentelemetry-jaeger = { version = "0.16.0", features = ["rt-tokio", "reqwest_collector_client"] }
tracing-opentelemetry = "0.17.3"
tracing = "0.1.35"
tracing-subscriber = "0.3.11"

[dependencies.mlua]
version = "0.6"
Expand Down
6 changes: 5 additions & 1 deletion examples/bench/init.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
local txapi = require('bench')
txapi.start({}) -- will use default
txapi.start({
fibers = 16,
max_batch = 16,
runtime = { type = "cur_thread" },
}) -- will use default
-- txapi.start({buffer = 128 })
-- txapi.start({buffer = 128, runtime = { type = "cur_thread" }})
-- txapi.start({buffer = 128, runtime = { type = "multi_thread" }}) -- default
Expand Down
16 changes: 14 additions & 2 deletions examples/bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,20 @@ use tokio::time::Instant;
use xtm_rust::{run_module, Dispatcher, ModuleConfig};

async fn module_main(dispatcher: Dispatcher) {
let iterations = 10_000_000;
tokio::spawn({
let dispatcher = dispatcher.try_clone().unwrap();
async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
println!("task_queue: {:>3}", dispatcher.len());
interval.tick().await;
}
}
});

let iterations = 4_000_000;

let worker_n = 6;
let worker_n = 16;
let iterations_per_worker = iterations / worker_n;
let mut workers = Vec::new();

Expand Down Expand Up @@ -57,6 +68,7 @@ fn bench(lua: &Lua) -> LuaResult<LuaTable> {
"start",
lua.create_function_mut(|lua, (config,): (LuaValue,)| {
let config: ModuleConfig = lua.from_value(config)?;
println!("{:?}", config);

run_module(module_main, config, lua).map_err(LuaError::external)
})?,
Expand Down
12 changes: 7 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use serde::{Deserialize, Serialize};
use tokio::runtime;

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ModuleConfig {
pub buffer: usize,
pub fibers: usize,
pub max_recv_retries: usize,
pub max_batch: usize,
pub coio_timeout: f64,
pub fiber_standby_timeout: f64,
pub runtime: RuntimeConfig,
}

Expand All @@ -16,14 +17,15 @@ impl Default for ModuleConfig {
Self {
buffer: 128,
fibers: 16,
max_recv_retries: 100,
coio_timeout: 1.0,
max_batch: 16,
coio_timeout: 0.1,
fiber_standby_timeout: 1.0,
runtime: RuntimeConfig::default(),
}
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "type")]
pub enum RuntimeConfig {
#[serde(rename(deserialize = "cur_thread"))]
Expand Down
155 changes: 155 additions & 0 deletions src/fiber_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use std::{collections::LinkedList, rc::Rc, time::Duration};

use crossbeam_channel::{unbounded, TryRecvError};
use mlua::Lua;
use tarantool::fiber;
use tracing_opentelemetry::OpenTelemetrySpanExt;
// use tracing;

use crate::{ChannelError, Executor, ModuleConfig, Task, InstrumentedTask};

struct SchedulerArgs<'a> {
lua: &'a Lua,
executor: Executor,
config: ModuleConfig,
}
fn scheduler_f(args: Box<SchedulerArgs>) -> i32 {
let SchedulerArgs {
lua,
executor,
config:
ModuleConfig {
max_batch,
coio_timeout,
fibers,
fiber_standby_timeout,
..
},
} = *args;

let cond = Rc::new(fiber::Cond::new());
let (tx, rx) = unbounded::<InstrumentedTask>();

let mut workers = LinkedList::new();
for _ in 0..fibers {
let mut worker = fiber::Fiber::new("worker", &mut worker_f);
worker.set_joinable(true);
worker.start(WorkerArgs {
cond: cond.clone(),
lua,
rx: rx.clone(),
fiber_standby_timeout,
});
workers.push_back(worker);
}

let result = loop {
let tasks = match executor.pop_many(max_batch, coio_timeout) {
Ok(tasks) => tasks,
Err(ChannelError::RXChannelClosed) => break Ok(()),
Err(err) => break Err(err),
};

for task in tasks {
tx.send(task).unwrap(); // TODO: add error handling
cond.signal();
}
};

// gracefully kill fibers
drop(tx);
cond.broadcast();

for worker in workers {
worker.join();
}

match result {
Ok(_) => 0,
Err(_) => -1,
}
}

struct WorkerArgs<'a> {
cond: Rc<fiber::Cond>,
lua: &'a Lua,
rx: crossbeam_channel::Receiver<InstrumentedTask>,
fiber_standby_timeout: f64,
}
fn worker_f(args: Box<WorkerArgs>) -> i32 {
let WorkerArgs {
cond,
lua,
rx,
fiber_standby_timeout,
} = *args;
let fiber_standby_timeout = Duration::from_secs_f64(fiber_standby_timeout);

let thread_func = lua
.create_function(move |lua, _: ()| {
loop {
match rx.try_recv() {
Ok((func, span_ctx)) => match {
let span = tracing::span!(tracing::Level::TRACE, "fiber pool: exec");
span.set_parent(span_ctx);
let _ = span.enter();

func(lua, span.context())
} {
Ok(()) => (),
Err(ChannelError::TXChannelClosed) => continue,
Err(err) => break Err(mlua::Error::external(err)),
},
Err(TryRecvError::Disconnected) => break Ok(()),
Err(TryRecvError::Empty) => {
let signaled = cond.wait_timeout(fiber_standby_timeout);
// if !signaled {
// // kill fiber
// break Ok(());
// }
}
}
}
})
.unwrap();
let thread = lua.create_thread(thread_func).unwrap();
match thread.resume(()) {
Ok(()) => 0,
Err(_) => -1,
}
}

pub(crate) struct FiberPool<'a> {
lua: &'a Lua,
executor: Executor,
config: ModuleConfig,

scheduler: fiber::Fiber<'a, SchedulerArgs<'a>>,
}

impl<'a> FiberPool<'a> {
pub fn new(lua: &'a Lua, executor: Executor, config: ModuleConfig) -> Self {
let mut scheduler = fiber::Fiber::new("scheduler", &mut scheduler_f);
scheduler.set_joinable(true);
Self {
lua,
executor,
config,
scheduler,
}
}

pub fn run(&mut self) -> std::io::Result<()> {
self.scheduler.start(SchedulerArgs {
lua: self.lua,
executor: self.executor.try_clone()?,
config: self.config.clone(),
});
Ok(())
}

// join will exit when all Dispatchers die
pub fn join(&self) {
self.scheduler.join();
}
}
42 changes: 10 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,34 @@ use std::future::Future;
use crossbeam_utils::thread;
use mlua::Lua;
use tokio::runtime;

pub use txapi::*;
use tarantool::fiber::Fiber;

mod eventfd;
mod txapi;
mod fiber_pool;

mod config;
pub use config::*;

use tokio::sync::Notify;
use std::sync::Arc;

pub fn run_module<Fut, Func>(
module_main: Func,
config: ModuleConfig,
lua: &Lua,
notifier : Arc<Notify>,
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

может просто добавим generic чтобы можно было что угодно передавать?

) -> io::Result<Fut::Output>
where
Func: FnOnce(Dispatcher) -> Fut,
Func: FnOnce(Dispatcher, Arc<Notify>) -> Fut,
Func: Send,
Fut: Future,
Fut::Output: Send,
{
let (dispatcher, executor) = channel(config.buffer)?;

let executor_loop = &mut |args: Box<(&Lua, Executor)>| {
let (lua, executor) = *args;

let thread_func = lua.create_function(move |lua, _: ()| {
Ok(loop {
match executor.exec(lua, config.max_recv_retries, config.coio_timeout) {
Ok(_) => continue,
Err(ChannelError::TXChannelClosed) => continue,
Err(ChannelError::RXChannelClosed) => break 0,
Err(_err) => break -1,
}
})
}).unwrap();
let thread = lua.create_thread(thread_func).unwrap();
thread.resume(()).unwrap()
};

// UNSAFE: fibers must die inside the current function
let mut fibers = Vec::with_capacity(config.fibers);
for _ in 0..config.fibers {
let mut fiber = Fiber::new("xtm", executor_loop);
fiber.set_joinable(true);
fiber.start((lua, executor.try_clone()?));
fibers.push(fiber);
}
let mut fiber_pool = fiber_pool::FiberPool::new(lua, executor, config.clone());
fiber_pool.run()?;

let result = thread::scope(|scope| {
let module_thread = scope
Expand All @@ -63,13 +43,11 @@ where
.enable_time()
.build()?;

Ok(rt.block_on(module_main(dispatcher)))
Ok(rt.block_on(module_main(dispatcher, notifier)))
})
.unwrap();

for fiber in &fibers {
fiber.join();
}
fiber_pool.join();
module_thread.join().unwrap().unwrap()
})
.unwrap();
Expand Down
Loading