Futures: низкая производительность однопоточного executor'a


#1

С помощью футур, решал задачу загрузки данных в микроконтроллер и параллельного опроса его состояния. И первое и второе это две отдельные задачи которые выполняет
tokio::runtime::current_thread::Runtime

Так вот, если каждый io оборачивать в FutureResult, то производительность загрузки данных в микроконтроллер была низкой.
Здесь критерием производительности был тот факт - что микроконтроллер “останавливался” когда данные вовремя не поступали, когда внутренний буфер оказывался пустым.

Я стал постепенно группировать io - увеличивая долю std::io::Result и уменьшая FutureResult в коде, пока не добился приемлемой производительности.

код который работает медленно:

pub fn load<'a>(
    cnc: Rc<RefCell<Controller>>,
    blocks: Option<Blocks>,
) -> impl Future<Item = Option<Blocks>, Error = Error> + 'a {
    enum LocalErrorKind {
        Empty,
        Cnc(Error),
    }
    result(blocks.ok_or(LocalErrorKind::Empty))
        .and_then(move |blocks| {
            loop_fn(iter_ok::<_, ()>(blocks).into_future(), move |future| {
                let cnc_clone = cnc.clone();
                future
                    .map_err(|_| LocalErrorKind::Empty)
                    .and_then(|(block, stream)| {
                        block
                            .ok_or(LocalErrorKind::Empty)
                            .map(move |block| (block, stream))
                    }).and_then(move |(block, stream)| {
                        let r = cnc_clone.borrow_mut().raw_load(&block);
                        result(r).then(move |r| match r {
                            Ok(_) => Ok(Loop::Continue(stream.into_future())),
                            Err(Error(ErrorKind::DeviceBusy, _)) => {
                                let blocks: Blocks = stream.collect().wait().unwrap().into();
                                info!(
                                    "Out of controller memory, loading will be delayed, left {}",
                                    blocks.len()
                                );
                                Ok(Loop::Break(Some(blocks)))
                            }
                            Err(err) => Err(LocalErrorKind::Cnc(err)),
                        })
                    })
            })
        }).or_else(|err| match err {
            LocalErrorKind::Empty => Ok(None),
            LocalErrorKind::Cnc(err) => Err(err),
        })
}

pub fn run_blocks(
    cnc: Rc<RefCell<Controller>>,
    blocks: Blocks,
) -> impl Future<Item = (), Error = Error> {
    enum LocalErrorKind {
        Timer(tokio::timer::Error),
        Cnc(Error),
        Stopped,
    }
    let cnc_start = cnc.clone();
    let cnc_pre_load = cnc.clone();
    let cnc_load = cnc.clone();
    result(
        cnc.borrow_mut()
            .free()
            .map_err(|err| LocalErrorKind::Cnc(err)),
    ).and_then(move |_| load(cnc_pre_load, Some(blocks)).map_err(|err| LocalErrorKind::Cnc(err)))
    .and_then(move |blocks| {
        let r = cnc_start.borrow_mut().start();
        r.map_err(|err| LocalErrorKind::Cnc(err))
            .map(move |_| blocks)
    }).and_then(move |blocks| {
        loop_fn(blocks, move |blocks| {
            let cnc_load_clone = cnc_load.clone();
            Interval::new_interval(Duration::from_secs(1))
                .into_future()
                .map_err(|(err, _)| LocalErrorKind::Timer(err))
                .and_then(move |_| {
                    load(cnc_load_clone, blocks).map_err(|err| LocalErrorKind::Cnc(err))
                }).and_then(|blocks| match blocks {
                    None => Ok(Loop::Break(())),
                    e @ _ => Ok(Loop::Continue(e)),
                })
        })
    }).or_else(|err| match err {
        LocalErrorKind::Cnc(err) => Err(err),
        LocalErrorKind::Timer(err) => {
            error!("Timer error: {}", err);
            Ok(())
        }
        LocalErrorKind::Stopped => {
            error!("Controller stopped, load blocks is not necessary");
            Ok(())
        }
    })
}

прошу не цепляться к let blocks: Blocks = stream.collect().wait().unwrap().into();, было и по другому но с производительностью те-же проблемы.

теперь код который имеет приемлемую производительность:

pub fn full_load(
    cnc: &mut Controller,
    blocks: Option<Blocks>,
) -> Result<(Option<Blocks>, Duration), Error> {
    let mut duration = Duration::new(0, 0);
    if let Some(mut blocks) = blocks {
        let mut counter = 0;
        for block in blocks.iter() {
            match cnc.raw_load(&block) {
                Ok(_) => {
                    counter += 1;
                    duration = duration
                        .checked_add(block.runtime())
                        .unwrap_or(Duration::from_secs(std::u64::MAX));
                }
                Err(Error(ErrorKind::DeviceBusy, _)) => {
                    debug!(
                        "Loaded {}-blocks - {:?}, loading will be delayed",
                        counter, duration
                    );
                    break;
                }
                Err(err) => {
                    return Err(err);
                }
            }
        }

        blocks.remove_front(counter);
        if blocks.is_empty() {
            Ok((None, duration))
        } else {
            Ok((Some(blocks), duration))
        }
    } else {
        Ok((None, duration))
    }
}

pub fn run_blocks(
    cnc: Rc<RefCell<Controller>>,
    blocks: Blocks,
) -> impl Future<Item = (), Error = Error> {
    enum LocalErrorKind {
        Timer(tokio::timer::Error),
        Cnc(Error),
        Stopped,
    }
    let cnc_load = cnc.clone();
    result({
        let mut cnc = cnc.borrow_mut();
        cnc.deref_mut()
            .free()
            .and_then(|_| full_load(cnc.deref_mut(), Some(blocks)))
            .and_then(|load| cnc.deref_mut().start().map(|_| load))
            .map_err(|err| LocalErrorKind::Cnc(err))
    })
    .and_then(move |loaded| {
        loop_fn(loaded, move |(blocks, runtime)| {
            let now = Instant::now();
            let cnc_load_clone = cnc_load.clone();
            let sleep = runtime / 2;
            debug!("Load will be delay: {:?}", sleep);
            Delay::new(Instant::now() + sleep)
                .map_err(|err| LocalErrorKind::Timer(err))
                .and_then(move |_| {
                    let mut cnc = cnc_load_clone.borrow_mut();
                    cnc.deref_mut()
                        .get_state()
                        .map_err(|err| LocalErrorKind::Cnc(err))
                        .and_then(|state| {
                            if state.is_run() {
                                Ok(())
                            } else {
                                Err(LocalErrorKind::Stopped)
                            }
                        })
                        .and_then(|_| {
                            full_load(cnc.deref_mut(), blocks)
                                .map_err(|err| LocalErrorKind::Cnc(err))
                        })
                        .map(|(blocks, time)| match blocks {
                            None => {
                                debug!("All block is loaded");
                                Loop::Break(())
                            }
                            e @ _ => Loop::Continue((
                                e,
                                runtime
                                    .checked_add(time)
                                    .or(Some(Duration::from_secs(std::u64::MAX)))
                                    .and_then(|sum| sum.checked_sub(now.elapsed()))
                                    .unwrap_or(Duration::new(0, 0)),
                            )),
                        })
                })
        })
    })
    .or_else(|err| match err {
        LocalErrorKind::Cnc(err) => Err(err),
        LocalErrorKind::Timer(err) => {
            error!("Timer error: {}", err);
            Ok(())
        }
        LocalErrorKind::Stopped => {
            error!("Controller stopped, load blocks is not necessary");
            Ok(())
        }
    })
}

Может кто-нибудь это прокомментировать?

И по поводу стратегии выполнения AndThen<A, B, F>, я правильно понимаю что если A.poll() вернула Async::Ready(), тут-же будет вызвано B.poll() без передачи управления executor’у?
код


#2

Будет вызван match f(a_result, data)? { без передачи управления executor’у


#3

Сложно так сходу сказать. Подскажи, верно ли я понимаю:

Медленный случай:

Tick(1 second) -> 
  future::loop for block in blocks ->
    controller.raw_load(block) while !DeviceBusy
    blocks.pop_front()

Быстрый случай:

Tick(1 second) ->
  count = 0
  for block in blocks
    controller.raw_load(block) while !DeviceBusy
    count += 1
  blocks.pop_front(count)

Если есть возможность портировать код на десктом, то можно посмотреть на кол-во событий tokio. Там дофига трейсов внутри. Ну и нам было бы неплохо посмотреть на логи.


#4

Будет вызван match f(a_result, data)? { без передачи управления executor’у

Здесь нужен все-таки какой-то выбор стратегии - часто переключатся между задачами плохо, потому-что происходит переключение контекста, но и не переключатся тоже плохо потомучто растет очередь входных событий

Медленный случай:

да в принципе все так, небольшая поправка:

Tick(1 second) -> 
  future::loop for block in blocks ->
    future::result controller.raw_load(block) while !DeviceBusy
    blocks.pop_front()

Если есть возможность портировать код на десктом, то можно посмотреть на кол-во событий tokio. Там дофига трейсов внутри. Ну и нам было бы неплохо посмотреть на логи.

Портировать можно если написать заглушку.

Ну и нам было бы неплохо посмотреть на логи.

речь о perf?


#5

Речь идет о RUST_LOG=tokio=trace ./a.out. Ну или как-то так, уже забыл синтаксис. https://docs.rs/env_logger/0.6.0/env_logger/


#6

А можно подробнее? Я что-то не допёр)