Потоки и итераторы: как "развернуть" один итератор внутри другого итератора?


#1

например, есть a = vec![1, 2, 3, 4, 5], а нужно получить итератор который для каждого a[N] будет генерировать последовательность от 0 до a[N],
то есть выдаст: 0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5?

А как тоже самое сделать для потока?
Я сделал так:

    pub fn poll(self, cnc: Rc<RefCell<Controller>>) -> impl Stream<Item = Event, Error = ()> {
        let mut irq = self.irq;
        let (mut irq_tx, irq_rx) = channel(1);
        spawn(move || {
            while let Ok(events) = irq.wait(None) {
                if let Err(err) = irq_tx.try_send(events.len()) {
                    error!("{}", err);
                    break;
                }
            }
            info!("Irq shutdown");
        });

        irq_rx
            .and_then(move |_| cnc.borrow_mut().clear_events().map_err(|_| ()))
            .map(|state| iter_ok(state.iter()))
            .flatten()
    }

и поток заканчивается после перечисления первого события.


#2

fn main() {
    let range = (1..=5).map(|x| {
        (0..=x)
    }).flatten();
    let vec: Vec<i32> = range.collect();
    
    vec.iter().for_each(|x| println!("x: {}", x));
}

(Playground)

Output:

x: 0
x: 1
x: 0
x: 1
x: 2
x: 0
x: 1
x: 2
x: 3
x: 0
x: 1
x: 2
x: 3
x: 4
x: 0
x: 1
x: 2
x: 3
x: 4
x: 5

Errors:

   Compiling playground v0.0.1 (/playground)
    Finished dev [unoptimized + debuginfo] target(s) in 1.35s
     Running `target/debug/playground`

А почему ты используешь poll?


#3

мой poll создает поток, который затем “поллет” executor. Прошу прощения за путаницу.


#4

Я не вижу смысла в ручных poll. Можешь сделать чистый MRE?


Ну и spawn внутри poll - ну такое. Постарайся попробовать решить свою проблему на чистых комбинаторах без poll.


#5

ну ок, давайте переименую:

    pub fn create_task(self, cnc: Rc<RefCell<Controller>>) -> impl Stream<Item = Event, Error = ()> {
        let mut irq = self.irq;
        let (mut irq_tx, irq_rx) = channel(1);
        spawn(move || {
            while let Ok(events) = irq.wait(None) {
                if let Err(err) = irq_tx.try_send(events.len()) {
                    error!("{}", err);
                    break;
                }
            }
            info!("Irq shutdown");
        });

        irq_rx
            .and_then(move |_| cnc.borrow_mut().clear_events().map_err(|_| ()))
            .map(|state| iter_ok(state.iter()))
            .flatten()
    }

#6

Выглядит лучше. А какое поведение ты ожидаешь?


#7

когда кончился iter_ok(state.iter()) ожидаем следующего irq_rx


#8

разобрался, я неверно вызывал(устанавливал) поток,
было:

    executor.spawn(
        cnc_events
            .poll(controller)
            .map(move |event| None)
            .into_future()
            .map(|(event, _)| {
                ()
            })
            .map_err(|_| ()),
    );

здесь, футура заканчивала работать после первого получения данных из потока.

стало:

    executor.spawn(cnc_events.poll(controller).for_each(move |event| {
        ...
        Ok(())
    }));

здесь футура зациклена.


#9

Ручные poll’ы - это основная причина застрелить мейнтейнеров токио, потому что благодаря их трюкачеству в tokio/examples эта плесень расползлась по всему коммьюнити.


#10

Это не ручной poll, это другая функция с таким же именем(писал об этом выше)


#11

ОК. Но все же в мире Rust функция с именем poll уже плотно зарезервирована футурами. Ну или мне так кажется.