Futures: запутался в комбинаторах


#1

error type mismatch resolving:

pub fn state_dispatcher(
    cnc: Rc<RefCell<Controller>>,
    irq_rx: Receiver<usize>,
    event_tx: Sender<Event>,
) -> impl Future<Item = (), Error = Error> {
    loop_fn(irq_rx.into_future(), |future| {
        let tx = event_tx.clone();
        future
            .and_then(|(_count, stream)| {
                result(cnc.borrow_mut().clear_events())
                    .and_then(|state| {
                        event_sender(state, tx).map(|_| Loop::Continue(stream.into_future())).or_else(|err| {
                            error!("Send event error: {:?}", err);
                            Ok(Loop::Break(()))
                        })
                    })
            }).or_else(|err| {
                error!("IRQ channel receive error: {:?}", err);
                Ok(Loop::Break(()))
            })
    }).map(|_| ())
}
error[E0271]: type mismatch resolving `<futures::AndThen<futures::FutureResult<controller::State, controller::errors::Error>, futures::OrElse<futures::Map<impl futures::Future, [closure@src/lib.rs:114:53: 114:93 stream:_]>, std::result::Result<futures::future::Loop<(), futures::stream::StreamFuture<futures::sync::mpsc::Receiver<usize>>>, controller::errors::Error>, [closure@src/lib.rs:114:103: 117:26]>, [closure@src/lib.rs:113:31: 118:22 tx:_, stream:_]> as futures::IntoFuture>::Error == ((), futures::sync::mpsc::Receiver<usize>)`
   --> src/lib.rs:111:14
    |
111 |             .and_then(|(_count, stream)| {
    |              ^^^^^^^^ expected struct `controller::errors::Error`, found tuple
    |
    = note: expected type `controller::errors::Error`
               found type `((), futures::sync::mpsc::Receiver<usize>)`

error[E0599]: no method named `or_else` found for type `futures::AndThen<futures::stream::StreamFuture<futures::sync::mpsc::Receiver<usize>>, futures::AndThen<futures::FutureResult<controller::State, controller::errors::Error>, futures::OrElse<futures::Map<impl futures::Future, [closure@src/lib.rs:114:53: 114:93 stream:_]>, std::result::Result<futures::future::Loop<(), futures::stream::StreamFuture<futures::sync::mpsc::Receiver<usize>>>, controller::errors::Error>, [closure@src/lib.rs:114:103: 117:26]>, [closure@src/lib.rs:113:31: 118:22 tx:_, stream:_]>, [closure@src/lib.rs:111:23: 119:14 cnc:_, tx:_]>` in the current scope
   --> src/lib.rs:119:16
    |
119 |             }).or_else(|err| {
    |                ^^^^^^^
    |
    = note: the method `or_else` exists but the following trait bounds were not satisfied:
            `futures::AndThen<futures::stream::StreamFuture<futures::sync::mpsc::Receiver<usize>>, futures::AndThen<futures::FutureResult<controller::State, controller::errors::Error>, futures::OrElse<futures::Map<impl futures::Future, [closure@src/lib.rs:114:53: 114:93 stream:_]>, std::result::Result<futures::future::Loop<(), futures::stream::StreamFuture<futures::sync::mpsc::Receiver<usize>>>, controller::errors::Error>, [closure@src/lib.rs:114:103: 117:26]>, [closure@src/lib.rs:113:31: 118:22 tx:_, stream:_]>, [closure@src/lib.rs:111:23: 119:14 cnc:_, tx:_]> : futures::Future`
            `&mut futures::AndThen<futures::stream::StreamFuture<futures::sync::mpsc::Receiver<usize>>, futures::AndThen<futures::FutureResult<controller::State, controller::errors::Error>, futures::OrElse<futures::Map<impl futures::Future, [closure@src/lib.rs:114:53: 114:93 stream:_]>, std::result::Result<futures::future::Loop<(), futures::stream::StreamFuture<futures::sync::mpsc::Receiver<usize>>>, controller::errors::Error>, [closure@src/lib.rs:114:103: 117:26]>, [closure@src/lib.rs:113:31: 118:22 tx:_, stream:_]>, [closure@src/lib.rs:111:23: 119:14 cnc:_, tx:_]> : futures::Future`
            `&mut futures::AndThen<futures::stream::StreamFuture<futures::sync::mpsc::Receiver<usize>>, futures::AndThen<futures::FutureResult<controller::State, controller::errors::Error>, futures::OrElse<futures::Map<impl futures::Future, [closure@src/lib.rs:114:53: 114:93 stream:_]>, std::result::Result<futures::future::Loop<(), futures::stream::StreamFuture<futures::sync::mpsc::Receiver<usize>>>, controller::errors::Error>, [closure@src/lib.rs:114:103: 117:26]>, [closure@src/lib.rs:113:31: 118:22 tx:_, stream:_]>, [closure@src/lib.rs:111:23: 119:14 cnc:_, tx:_]> : futures::Stream`

playground


#2

Сделал так:

enum DispatcherError {
    Sender(TrySendError<Event>),
    Receiver,
    Cnc(Error),
}

impl From<TrySendError<Event>> for DispatcherError {
    fn from(err: TrySendError<Event>) -> Self {
        DispatcherError::Sender(err)
    }
}

impl From<((), Receiver<usize>)> for DispatcherError {
    fn from(_: ((), Receiver<usize>)) -> Self {
        DispatcherError::Receiver
    }
}

impl From<Error> for DispatcherError {
    fn from(err: Error) -> Self {
        DispatcherError::Cnc(err)
    }
}

pub fn event_dispatcher(
    cnc: Rc<RefCell<Controller>>,
    irq_rx: Receiver<usize>,
    event_tx: Sender<Event>,
) -> impl Future<Item = (), Error = Error> {
    loop_fn(irq_rx.into_future(), move |future| {
        let cnc_clone = cnc.clone();
        let tx = event_tx.clone();
        future
            .map_err(|err| err.into())
            .and_then(move |(_, stream)| {
                let r = cnc_clone.borrow_mut().clear_events();
                result(r).map_err(|err| err.into()).and_then(move |state| {
                    event_sender(state, tx)
                        .map_err(|err| err.into())
                        .map(|_| Loop::Continue(stream.into_future()))
                })
            }).or_else(|err| match err {
                DispatcherError::Receiver | DispatcherError::Sender(_) => Ok(Loop::Break(())),
                DispatcherError::Cnc(err) => Err(err),
            })
    })
}