С помощью футур, решал задачу загрузки данных в микроконтроллер и параллельного опроса его состояния. И первое и второе это две отдельные задачи которые выполняет
tokio::runtime::current_thread::Runtime
Так вот, если каждый io оборачивать в FutureResult, то производительность загрузки данных в микроконтроллер была низкой.
Здесь критерием производительности был тот факт - что микроконтроллер “останавливался” когда данные вовремя не поступали, когда внутренний буфер оказывался пустым.
Я стал постепенно группировать io - увеличивая долю std::io::Result
и уменьшая FutureResult
в коде, пока не добился приемлемой производительности.
код который работает медленно:
pub fn load<'a>(
cnc: Rc<RefCell<Controller>>,
blocks: Option<Blocks>,
) -> impl Future<Item = Option<Blocks>, Error = Error> + 'a {
enum LocalErrorKind {
Empty,
Cnc(Error),
}
result(blocks.ok_or(LocalErrorKind::Empty))
.and_then(move |blocks| {
loop_fn(iter_ok::<_, ()>(blocks).into_future(), move |future| {
let cnc_clone = cnc.clone();
future
.map_err(|_| LocalErrorKind::Empty)
.and_then(|(block, stream)| {
block
.ok_or(LocalErrorKind::Empty)
.map(move |block| (block, stream))
}).and_then(move |(block, stream)| {
let r = cnc_clone.borrow_mut().raw_load(&block);
result(r).then(move |r| match r {
Ok(_) => Ok(Loop::Continue(stream.into_future())),
Err(Error(ErrorKind::DeviceBusy, _)) => {
let blocks: Blocks = stream.collect().wait().unwrap().into();
info!(
"Out of controller memory, loading will be delayed, left {}",
blocks.len()
);
Ok(Loop::Break(Some(blocks)))
}
Err(err) => Err(LocalErrorKind::Cnc(err)),
})
})
})
}).or_else(|err| match err {
LocalErrorKind::Empty => Ok(None),
LocalErrorKind::Cnc(err) => Err(err),
})
}
pub fn run_blocks(
cnc: Rc<RefCell<Controller>>,
blocks: Blocks,
) -> impl Future<Item = (), Error = Error> {
enum LocalErrorKind {
Timer(tokio::timer::Error),
Cnc(Error),
Stopped,
}
let cnc_start = cnc.clone();
let cnc_pre_load = cnc.clone();
let cnc_load = cnc.clone();
result(
cnc.borrow_mut()
.free()
.map_err(|err| LocalErrorKind::Cnc(err)),
).and_then(move |_| load(cnc_pre_load, Some(blocks)).map_err(|err| LocalErrorKind::Cnc(err)))
.and_then(move |blocks| {
let r = cnc_start.borrow_mut().start();
r.map_err(|err| LocalErrorKind::Cnc(err))
.map(move |_| blocks)
}).and_then(move |blocks| {
loop_fn(blocks, move |blocks| {
let cnc_load_clone = cnc_load.clone();
Interval::new_interval(Duration::from_secs(1))
.into_future()
.map_err(|(err, _)| LocalErrorKind::Timer(err))
.and_then(move |_| {
load(cnc_load_clone, blocks).map_err(|err| LocalErrorKind::Cnc(err))
}).and_then(|blocks| match blocks {
None => Ok(Loop::Break(())),
e @ _ => Ok(Loop::Continue(e)),
})
})
}).or_else(|err| match err {
LocalErrorKind::Cnc(err) => Err(err),
LocalErrorKind::Timer(err) => {
error!("Timer error: {}", err);
Ok(())
}
LocalErrorKind::Stopped => {
error!("Controller stopped, load blocks is not necessary");
Ok(())
}
})
}
прошу не цепляться к let blocks: Blocks = stream.collect().wait().unwrap().into();
, было и по другому но с производительностью те-же проблемы.
теперь код который имеет приемлемую производительность:
pub fn full_load(
cnc: &mut Controller,
blocks: Option<Blocks>,
) -> Result<(Option<Blocks>, Duration), Error> {
let mut duration = Duration::new(0, 0);
if let Some(mut blocks) = blocks {
let mut counter = 0;
for block in blocks.iter() {
match cnc.raw_load(&block) {
Ok(_) => {
counter += 1;
duration = duration
.checked_add(block.runtime())
.unwrap_or(Duration::from_secs(std::u64::MAX));
}
Err(Error(ErrorKind::DeviceBusy, _)) => {
debug!(
"Loaded {}-blocks - {:?}, loading will be delayed",
counter, duration
);
break;
}
Err(err) => {
return Err(err);
}
}
}
blocks.remove_front(counter);
if blocks.is_empty() {
Ok((None, duration))
} else {
Ok((Some(blocks), duration))
}
} else {
Ok((None, duration))
}
}
pub fn run_blocks(
cnc: Rc<RefCell<Controller>>,
blocks: Blocks,
) -> impl Future<Item = (), Error = Error> {
enum LocalErrorKind {
Timer(tokio::timer::Error),
Cnc(Error),
Stopped,
}
let cnc_load = cnc.clone();
result({
let mut cnc = cnc.borrow_mut();
cnc.deref_mut()
.free()
.and_then(|_| full_load(cnc.deref_mut(), Some(blocks)))
.and_then(|load| cnc.deref_mut().start().map(|_| load))
.map_err(|err| LocalErrorKind::Cnc(err))
})
.and_then(move |loaded| {
loop_fn(loaded, move |(blocks, runtime)| {
let now = Instant::now();
let cnc_load_clone = cnc_load.clone();
let sleep = runtime / 2;
debug!("Load will be delay: {:?}", sleep);
Delay::new(Instant::now() + sleep)
.map_err(|err| LocalErrorKind::Timer(err))
.and_then(move |_| {
let mut cnc = cnc_load_clone.borrow_mut();
cnc.deref_mut()
.get_state()
.map_err(|err| LocalErrorKind::Cnc(err))
.and_then(|state| {
if state.is_run() {
Ok(())
} else {
Err(LocalErrorKind::Stopped)
}
})
.and_then(|_| {
full_load(cnc.deref_mut(), blocks)
.map_err(|err| LocalErrorKind::Cnc(err))
})
.map(|(blocks, time)| match blocks {
None => {
debug!("All block is loaded");
Loop::Break(())
}
e @ _ => Loop::Continue((
e,
runtime
.checked_add(time)
.or(Some(Duration::from_secs(std::u64::MAX)))
.and_then(|sum| sum.checked_sub(now.elapsed()))
.unwrap_or(Duration::new(0, 0)),
)),
})
})
})
})
.or_else(|err| match err {
LocalErrorKind::Cnc(err) => Err(err),
LocalErrorKind::Timer(err) => {
error!("Timer error: {}", err);
Ok(())
}
LocalErrorKind::Stopped => {
error!("Controller stopped, load blocks is not necessary");
Ok(())
}
})
}
Может кто-нибудь это прокомментировать?
И по поводу стратегии выполнения AndThen<A, B, F>, я правильно понимаю что если A.poll()
вернула Async::Ready()
, тут-же будет вызвано B.poll()
без передачи управления executor’у?
код