Футуры: как лучше организовать динамическую цепочку обработчиков событий?


#1

Задачам(Task - в терминах futures) требуется получать подтверждения(события) операций.
Источником событий является аппаратное прерывание, которое затем разбирается на события(одно прерывание может содержать несколько событий). Таким образом получается цепочка событий.
Хотелось бы организовать динамическую цепочку обработчиков:
обработчик_события_1(&событие) -> не_мое_событие -> обработчик_события_2(&событие) -> и т.д. -> обработчик_события_по_умолчанию(&событие).
если обработчик_события_N(&событие) == мое_событие, то нужно:

  • удалить обработчик из цепочки
  • событие следующему обработчику не направляется
  • уведомить соответствующую задачу (сделать pool)

Собственно вопрос:
Цепочку событий можно представить как futures::sync::mpsc::channel,
но как организовать динамическую цепочку обработчиков не совсем понятно.

Кто нибудь сталкивался с подобной задачей? Как ее решили?


#2

Сделать futures::Stream который будет ловить аппаратное прерывание и генерить несколько Stream::Item


#3

Это сделано. Что делать дальше?
Как быть с цепочкой обработчиков?


#4

А как именно предполагается проверять, чьё это событие?

Сколько я понимаю логику futures, один возможный подход - уведомить все, пусть они проверят, к ним ли это, и если нет - зарегистрируются и уснут обратно.


#5

Динамически добавлять/удалять цепочки в общем случае не получится. Тут можно только фильтровать, сообщения. Вообще лучше показать пример кода что бы понятнее было, возможно есть другие варианты


#6

Примера кода как бы нет, поэтому и спрашиваю.
Я что-то накидал и но мне это не очень нравиться:

pub enum ConditionResult {
    Handle { release: bool },
    Not,
}

pub fn vector_of_handlers(
    rx: Receiver<Event>,
    vector: Rc<RefCell<Vec<Box<FnMut(&Event) -> ConditionResult>>>>,
) -> impl Future<Item = (), Error = ()> {
    // let own = vector.clone();
    loop_fn(rx.into_future(), move |future| {
        let handlers = vector.clone();
        future
            .and_then(move |(event, stream)| {
                result(event.ok_or(()))
                    .and_then(move |event| {
                        let mut remove: Option<usize> = None;
                        let mut handle = false;
                        for (index, handler) in handlers.borrow_mut().iter_mut().enumerate() {
                            let r = handler.deref_mut()(&event);

                            if let ConditionResult::Handle { release } = r {
                                if release {
                                    remove = Some(index);
                                }
                                handle = true;
                                break;
                            }
                        }

                        if let Some(index) = remove {
                            handlers.borrow_mut().remove(index);
                        }

                        if !handle {
                            default_handler(&event)
                        }

                        Ok(Loop::Continue(stream.into_future()))
                    }).or_else(|_| Ok(Loop::Break(())))
            }).or_else(|_| Ok(Loop::Break(())))
    })
}


#7

Можно обработчик добавлять в сам Event

struct EventHandle {
   event: Event,
   handle: Option<Box<FnOnce(Event)>> 
}

В итоге цепочка будет выглядеть как

   source_stream
     .map(add_specific_handler) // Event -> EventHandler
     .map(call_specific_handler) // if let Some(handler) ....
     .map(call_default_handler) // if handle.is_none()

#8

то есть где-то должно быть соответствие Event == EventHandler? ну что бы собрать EventHandle?